Merge remote-tracking branch 'origin/master' into feat/async-restartable-workers

This commit is contained in:
Łukasz Magiera 2020-10-01 02:39:48 +02:00
commit 5932f28519
19 changed files with 297 additions and 187 deletions

View File

@ -280,7 +280,7 @@ type FullNode interface {
// of status updates.
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error)
// ClientCalcCommP calculates the CommP for a specified file
ClientCalcCommP(ctx context.Context, inpath string) (*CommPRet, error)
// ClientGenCar generates a CAR file for the specified file.

View File

@ -156,7 +156,7 @@ type FullNodeStruct struct {
ClientGetDealUpdates func(ctx context.Context) (<-chan api.DealInfo, error) `perm:"read"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
@ -497,7 +497,7 @@ func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api
return c.Internal.ClientRetrieveWithEvents(ctx, order, ref)
}
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner)
}
func (c *FullNodeStruct) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {

View File

@ -569,7 +569,7 @@ func interactiveDeal(cctx *cli.Context) error {
continue
}
ask = *a.Ask
ask = *a
// TODO: run more validation
state = "confirm"
@ -951,15 +951,15 @@ var clientQueryAskCmd = &cli.Command{
}
fmt.Printf("Ask: %s\n", maddr)
fmt.Printf("Price per GiB: %s\n", types.FIL(ask.Ask.Price))
fmt.Printf("Verified Price per GiB: %s\n", types.FIL(ask.Ask.VerifiedPrice))
fmt.Printf("Max Piece size: %s\n", types.SizeStr(types.NewInt(uint64(ask.Ask.MaxPieceSize))))
fmt.Printf("Price per GiB: %s\n", types.FIL(ask.Price))
fmt.Printf("Verified Price per GiB: %s\n", types.FIL(ask.VerifiedPrice))
fmt.Printf("Max Piece size: %s\n", types.SizeStr(types.NewInt(uint64(ask.MaxPieceSize))))
size := cctx.Int64("size")
if size == 0 {
return nil
}
perEpoch := types.BigDiv(types.BigMul(ask.Ask.Price, types.NewInt(uint64(size))), types.NewInt(1<<30))
perEpoch := types.BigDiv(types.BigMul(ask.Price, types.NewInt(uint64(size))), types.NewInt(1<<30))
fmt.Printf("Price per Block: %s\n", types.FIL(perEpoch))
duration := cctx.Int64("duration")
@ -1423,7 +1423,7 @@ func toChannelOutput(useColor bool, otherPartyColumn string, channel lapi.DataTr
otherPartyColumn: otherParty,
"Root Cid": rootCid,
"Initiated?": initiated,
"Transferred": channel.Transferred,
"Transferred": units.BytesSize(float64(channel.Transferred)),
"Voucher": voucher,
"Message": channel.Message,
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/hako/durafmt"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
@ -36,11 +37,11 @@ func parseTipSet(ctx context.Context, api api.FullNode, vals []string) (*types.T
func EpochTime(curr, e abi.ChainEpoch) string {
switch {
case curr > e:
return fmt.Sprintf("%d (%s ago)", e, time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(curr-e)))
return fmt.Sprintf("%d (%s ago)", e, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(curr-e))).LimitFirstN(2))
case curr == e:
return fmt.Sprintf("%d (now)", e)
case curr < e:
return fmt.Sprintf("%d (in %s)", e, time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(e-curr)))
return fmt.Sprintf("%d (in %s)", e, durafmt.Parse(time.Second*time.Duration(int64(build.BlockDelaySecs)*int64(e-curr))).LimitFirstN(2))
}
panic("math broke")

View File

@ -5,19 +5,22 @@ import (
"os"
"sort"
"strconv"
"text/tabwriter"
"time"
"github.com/docker/go-units"
"github.com/fatih/color"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/tablewriter"
lcli "github.com/filecoin-project/lotus/cli"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
@ -144,8 +147,19 @@ var sectorsListCmd = &cli.Command{
Name: "show-removed",
Usage: "show removed sectors",
},
&cli.BoolFlag{
Name: "color",
Aliases: []string{"c"},
Value: true,
},
&cli.BoolFlag{
Name: "fast",
Usage: "don't show on-chain info for better performance",
},
},
Action: func(cctx *cli.Context) error {
color.NoColor = !cctx.Bool("color")
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
@ -170,7 +184,12 @@ var sectorsListCmd = &cli.Command{
return err
}
activeSet, err := fullApi.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
head, err := fullApi.ChainHead(ctx)
if err != nil {
return err
}
activeSet, err := fullApi.StateMinerActiveSectors(ctx, maddr, head.Key())
if err != nil {
return err
}
@ -179,7 +198,7 @@ var sectorsListCmd = &cli.Command{
activeIDs[info.SectorNumber] = struct{}{}
}
sset, err := fullApi.StateMinerSectors(ctx, maddr, nil, types.EmptyTSK)
sset, err := fullApi.StateMinerSectors(ctx, maddr, nil, head.Key())
if err != nil {
return err
}
@ -192,12 +211,26 @@ var sectorsListCmd = &cli.Command{
return list[i] < list[j]
})
w := tabwriter.NewWriter(os.Stdout, 8, 4, 1, ' ', 0)
tw := tablewriter.New(
tablewriter.Col("ID"),
tablewriter.Col("State"),
tablewriter.Col("OnChain"),
tablewriter.Col("Active"),
tablewriter.Col("Expiration"),
tablewriter.Col("Deals"),
tablewriter.Col("DealWeight"),
tablewriter.NewLineCol("Error"),
tablewriter.NewLineCol("EarlyExpiration"))
fast := cctx.Bool("fast")
for _, s := range list {
st, err := nodeApi.SectorsStatus(ctx, s, false)
st, err := nodeApi.SectorsStatus(ctx, s, !fast)
if err != nil {
fmt.Fprintf(w, "%d:\tError: %s\n", s, err)
tw.Write(map[string]interface{}{
"ID": s,
"Error": err,
})
continue
}
@ -205,20 +238,60 @@ var sectorsListCmd = &cli.Command{
_, inSSet := commitedIDs[s]
_, inASet := activeIDs[s]
_, _ = fmt.Fprintf(w, "%d: %s\tsSet: %s\tactive: %s\ttktH: %d\tseedH: %d\ttoUpgrade: %t\tdeals: %v\n",
s,
st.State,
yesno(inSSet),
yesno(inASet),
st.Ticket.Epoch,
st.Seed.Epoch,
st.ToUpgrade,
st.Deals,
)
dw := .0
if st.Expiration-st.Activation > 0 {
dw = float64(big.Div(st.DealWeight, big.NewInt(int64(st.Expiration-st.Activation))).Uint64())
}
var deals int
for _, deal := range st.Deals {
if deal != 0 {
deals++
}
}
exp := st.Expiration
if st.OnTime > 0 && st.OnTime < exp {
exp = st.OnTime // Can be different when the sector was CC upgraded
}
m := map[string]interface{}{
"ID": s,
"State": color.New(stateOrder[sealing.SectorState(st.State)].col).Sprint(st.State),
"OnChain": yesno(inSSet),
"Active": yesno(inASet),
}
if deals > 0 {
m["Deals"] = color.GreenString("%d", deals)
} else {
m["Deals"] = color.BlueString("CC")
if st.ToUpgrade {
m["Deals"] = color.CyanString("CC(upgrade)")
}
}
if !fast {
if !inSSet {
m["Expiration"] = "n/a"
} else {
m["Expiration"] = lcli.EpochTime(head.Height(), exp)
if !fast && deals > 0 {
m["DealWeight"] = units.BytesSize(dw)
}
if st.Early > 0 {
m["EarlyExpiration"] = color.YellowString(lcli.EpochTime(head.Height(), st.Early))
}
}
}
tw.Write(m)
}
}
return w.Flush()
return tw.Flush(os.Stdout)
},
}
@ -447,7 +520,7 @@ var sectorsUpdateCmd = &cli.Command{
func yesno(b bool) string {
if b {
return "YES"
return color.GreenString("YES")
}
return "NO"
return color.RedString("NO")
}

View File

@ -1122,20 +1122,14 @@ Inputs:
Response:
```json
{
"Ask": {
"Price": "0",
"VerifiedPrice": "0",
"MinPieceSize": 1032,
"MaxPieceSize": 1032,
"Miner": "t01234",
"Timestamp": 10101,
"Expiry": 10101,
"SeqNo": 42
},
"Signature": {
"Type": 2,
"Data": "Ynl0ZSBhcnJheQ=="
}
"Price": "0",
"VerifiedPrice": "0",
"MinPieceSize": 1032,
"MaxPieceSize": 1032,
"Miner": "t01234",
"Timestamp": 10101,
"Expiry": 10101,
"SeqNo": 42
}
```

View File

@ -10,14 +10,38 @@ type Resources struct {
MinMemory uint64 // What Must be in RAM for decent perf
MaxMemory uint64 // Memory required (swap + ram)
Threads int // -1 = multithread
CanGPU bool
MaxParallelism int // -1 = multithread
CanGPU bool
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
}
func (r Resources) MultiThread() bool {
return r.Threads == -1
/*
Percent of threads to allocate to parallel tasks
12 * 0.92 = 11
16 * 0.92 = 14
24 * 0.92 = 22
32 * 0.92 = 29
64 * 0.92 = 58
128 * 0.92 = 117
*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
// TODO: Take NUMA into account
func (r Resources) Threads(wcpus uint64) uint64 {
if r.MaxParallelism == -1 {
n := (wcpus * ParallelNum) / ParallelDenom
if n == 0 {
return wcpus
}
return n
}
return uint64(r.MaxParallelism)
}
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
@ -26,7 +50,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 30,
MinMemory: 8 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 30,
},
@ -34,7 +58,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 4 << 30,
MinMemory: 4 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 30,
},
@ -42,7 +66,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 30,
},
@ -50,7 +74,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 2 << 10,
},
@ -58,7 +82,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 8 << 20,
},
@ -68,7 +92,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 128 << 30,
MinMemory: 112 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 10 << 20,
},
@ -76,7 +100,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 64 << 30,
MinMemory: 56 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 10 << 20,
},
@ -84,7 +108,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 768 << 20,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 20,
},
@ -92,7 +116,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 2 << 10,
},
@ -100,35 +124,35 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 8 << 20,
},
},
sealtasks.TTPreCommit2: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
MaxMemory: 64 << 30,
MinMemory: 64 << 30,
MaxMemory: 30 << 30,
MinMemory: 30 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 60 << 30,
BaseMinMemory: 1 << 30,
},
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
MaxMemory: 32 << 30,
MinMemory: 32 << 30,
MaxMemory: 15 << 30,
MinMemory: 15 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 30 << 30,
BaseMinMemory: 1 << 30,
},
abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
Threads: -1,
MaxParallelism: -1,
BaseMinMemory: 1 << 30,
},
@ -136,7 +160,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: -1,
MaxParallelism: -1,
BaseMinMemory: 2 << 10,
},
@ -144,7 +168,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: -1,
MaxParallelism: -1,
BaseMinMemory: 8 << 20,
},
@ -154,7 +178,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 1 << 30,
},
@ -162,7 +186,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 1 << 30,
},
@ -170,7 +194,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 1 << 30,
},
@ -178,7 +202,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 2 << 10,
},
@ -186,7 +210,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 8 << 20,
},
@ -196,8 +220,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 190 << 30, // TODO: Confirm
MinMemory: 60 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 64 << 30, // params
},
@ -205,8 +229,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 150 << 30, // TODO: ~30G of this should really be BaseMaxMemory
MinMemory: 30 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 32 << 30, // params
},
@ -214,8 +238,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
Threads: 1, // This is fine
CanGPU: true,
MaxParallelism: 1, // This is fine
CanGPU: true,
BaseMinMemory: 10 << 30,
},
@ -223,8 +247,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 1,
CanGPU: true,
MaxParallelism: 1,
CanGPU: true,
BaseMinMemory: 2 << 10,
},
@ -232,8 +256,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 1,
CanGPU: true,
MaxParallelism: 1,
CanGPU: true,
BaseMinMemory: 8 << 20,
},
@ -243,8 +267,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -252,8 +276,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -261,8 +285,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -270,8 +294,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -279,8 +303,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},

View File

@ -28,12 +28,7 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResource
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.gpuUsed = r.CanGPU
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.cpuUse += r.Threads(wr.CPUs)
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
@ -42,12 +37,7 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.cpuUse -= r.Threads(wr.CPUs)
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
@ -68,16 +58,9 @@ func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, call
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
if a.cpuUse+needRes.Threads(res.CPUs) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads(res.CPUs), a.cpuUse, res.CPUs)
return false
}
if len(res.GPUs) > 0 && needRes.CanGPU {

View File

@ -288,6 +288,9 @@ func TestSched(t *testing.T) {
}
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
ParallelNum = 1
ParallelDenom = 1
return func(t *testing.T) {
index := stores.NewIndex()

9
go.mod
View File

@ -26,15 +26,15 @@ require (
github.com/filecoin-project/go-bitfield v0.2.1-0.20200920172649-837cbe6a1ed3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.6.6
github.com/filecoin-project/go-data-transfer v0.6.7
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v0.6.3
github.com/filecoin-project/go-fil-markets v0.7.0
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc
github.com/filecoin-project/go-statemachine v0.0.0-20200813232949-df9b130df370
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/specs-actors v0.9.11
@ -46,6 +46,7 @@ require (
github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
@ -115,7 +116,7 @@ require (
github.com/syndtr/goleveldb v1.0.0
github.com/urfave/cli/v2 v2.2.0
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor-gen v0.0.0-20200814224545-656e08ce49ee
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542

20
go.sum
View File

@ -236,12 +236,14 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.6.6 h1:2TccLSxPYJENcYRdov2WvpTvQ1qUMrPkWe8sBrfj36g=
github.com/filecoin-project/go-data-transfer v0.6.6/go.mod h1:C++k1U6+jMQODOaen5OPDo9XQbth9Yq3ie94vNjBJbk=
github.com/filecoin-project/go-data-transfer v0.6.7 h1:Kacr5qz2YWtd3sensU6aXFtES7joeapVDeXApeUD35I=
github.com/filecoin-project/go-data-transfer v0.6.7/go.mod h1:C++k1U6+jMQODOaen5OPDo9XQbth9Yq3ie94vNjBJbk=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v0.6.3 h1:3kTxfquGvk3zQY+hJH1kEA28tRQ47phqSRqOI4+YcQM=
github.com/filecoin-project/go-fil-markets v0.6.3/go.mod h1:Ug1yhGhzTYC6qrpKsR2QpU8QRCeBpwkTA9RICVKuOMM=
github.com/filecoin-project/go-fil-markets v0.7.0 h1:tcEZiUNIYQJ4PBzgVpLwfdJ4ZdC4WCv9LsgvsoCXIls=
github.com/filecoin-project/go-fil-markets v0.7.0/go.mod h1:5Pt4DXQqUoUrp9QzlSdlYTpItXxwAtqKrxRWQ6hAOqk=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
@ -260,8 +262,8 @@ github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df/go
github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc h1:1vr/LoqGq5m5g37Q3sNSAjfwF1uJY0zmiHcvnxY6hik=
github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200813232949-df9b130df370 h1:Jbburj7Ih2iaJ/o5Q9A+EAeTabME6YII7FLi9SKUf5c=
github.com/filecoin-project/go-statemachine v0.0.0-20200813232949-df9b130df370/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe h1:dF8u+LEWeIcTcfUcCf3WFVlc81Fr2JKg8zPzIbBDKDw=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
@ -415,6 +417,8 @@ github.com/gxed/go-shellwords v1.0.3/go.mod h1:N7paucT91ByIjmVJHhvoarjoQnmsi3Jd3
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE=
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026 h1:BpJ2o0OR5FV7vrkDYfXYVJQeMNWa8RhklZOpW2ITAIQ=
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026/go.mod h1:5Scbynm8dF1XAPwIwkGPqzkM/shndPm79Jd1003hTjE=
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 h1:F9k+7wv5OIk1zcq23QpdiL0hfDuXPjuOmMNaC6fgQ0Q=
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
@ -1394,8 +1398,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d/go.mod h1:f
github.com/whyrusleeping/cbor-gen v0.0.0-20200715143311-227fab5a2377/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/cbor-gen v0.0.0-20200810223238-211df3b9e24c/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/cbor-gen v0.0.0-20200812213548-958ddffe352c/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/cbor-gen v0.0.0-20200814224545-656e08ce49ee h1:U7zWWvvAjT76EiuWPSOiZlQDnaQYPxPoxugTtTAcJK0=
github.com/whyrusleeping/cbor-gen v0.0.0-20200814224545-656e08ce49ee/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163 h1:TtcUeY2XZSriVWR1pXyfCBWIf/NGC2iUdNw1lofUjUU=
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
github.com/whyrusleeping/go-ctrlnet v0.0.0-20180313164037-f564fbbdaa95/go.mod h1:SJqKCCPXRfBFCwXjfNT/skfsceF7+MBFLI2OrvuRA7g=

View File

@ -12,6 +12,7 @@ import (
type Column struct {
Name string
SeparateLine bool
Lines int
}
type TableWriter struct {
@ -50,6 +51,7 @@ cloop:
for i, column := range w.cols {
if column.Name == col {
byColID[i] = fmt.Sprint(val)
w.cols[i].Lines++
continue cloop
}
}
@ -58,6 +60,7 @@ cloop:
w.cols = append(w.cols, Column{
Name: col,
SeparateLine: false,
Lines: 1,
})
}
@ -77,7 +80,11 @@ func (w *TableWriter) Flush(out io.Writer) error {
w.rows = append([]map[int]string{header}, w.rows...)
for col := range w.cols {
for col, c := range w.cols {
if c.Lines == 0 {
continue
}
for _, row := range w.rows {
val, found := row[col]
if !found {
@ -94,9 +101,13 @@ func (w *TableWriter) Flush(out io.Writer) error {
cols := make([]string, len(w.cols))
for ci, col := range w.cols {
if col.Lines == 0 {
continue
}
e, _ := row[ci]
pad := colLengths[ci] - cliStringLength(e) + 2
if !col.SeparateLine {
if !col.SeparateLine && col.Lines > 0 {
e = e + strings.Repeat(" ", pad)
if _, err := fmt.Fprint(out, e); err != nil {
return err

View File

@ -29,6 +29,17 @@ func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrieval
log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
}
// ReadyLogger returns a function to log the results of module initialization
func ReadyLogger(module string) func(error) {
return func(err error) {
if err != nil {
log.Errorw("module initialization error", "module", module, "err", err)
} else {
log.Infow("module ready", "module", module)
}
}
}
type RetrievalEvent struct {
Event retrievalmarket.ClientEvent
Status retrievalmarket.DealStatus

View File

@ -438,37 +438,6 @@ func (c *ClientNodeAdapter) GetDefaultWalletAddress(ctx context.Context) (addres
return addr, err
}
func (c *ClientNodeAdapter) ValidateAskSignature(ctx context.Context, ask *storagemarket.SignedStorageAsk, encodedTs shared.TipSetToken) (bool, error) {
tsk, err := types.TipSetKeyFromBytes(encodedTs)
if err != nil {
return false, err
}
mi, err := c.StateMinerInfo(ctx, ask.Ask.Miner, tsk)
if err != nil {
return false, xerrors.Errorf("failed to get worker for miner in ask: %w", err)
}
sigb, err := cborutil.Dump(ask.Ask)
if err != nil {
return false, xerrors.Errorf("failed to re-serialize ask")
}
ts, err := c.ChainGetTipSet(ctx, tsk)
if err != nil {
return false, xerrors.Errorf("failed to load tipset")
}
m, err := c.StateManager.ResolveToKeyAddress(ctx, mi.Worker, ts)
if err != nil {
return false, xerrors.Errorf("failed to resolve miner to key address")
}
err = sigs.Verify(ask.Signature, m, sigb)
return err == nil, err
}
func (c *ClientNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
head, err := c.ChainHead(ctx)
if err != nil {

View File

@ -20,8 +20,9 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/discovery"
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
@ -293,8 +294,8 @@ func Online() Option {
Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(new(*discovery.Local), modules.NewLocalDiscovery),
Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver),
Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),

View File

@ -33,6 +33,7 @@ import (
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/discovery"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -70,7 +71,7 @@ type API struct {
paych.PaychAPI
SMDealClient storagemarket.StorageClient
RetDiscovery rm.PeerResolver
RetDiscovery discovery.PeerResolver
Retrieval rm.RetrievalClient
Chain *store.ChainStore
@ -614,18 +615,18 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed getting miner info: %w", err)
}
info := utils.NewStorageProviderInfo(miner, mi.Worker, mi.SectorSize, p, mi.Multiaddrs)
signedAsk, err := a.SMDealClient.GetAsk(ctx, info)
ask, err := a.SMDealClient.GetAsk(ctx, info)
if err != nil {
return nil, err
}
return signedAsk, nil
return ask, nil
}
func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {

View File

@ -12,8 +12,9 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-fil-markets/discovery"
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -112,12 +113,13 @@ func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/client"))
}
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds) (storagemarket.StorageClient, error) {
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds) (storagemarket.StorageClient, error) {
net := smnet.NewFromLibp2pHost(h)
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, dealFunds, storageimpl.DealPollingInterval(time.Second))
if err != nil {
return nil, err
}
c.OnReady(marketevents.ReadyLogger("storage client"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
c.SubscribeToEvents(marketevents.StorageClientLogger)
@ -135,7 +137,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
}
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver retrievalmarket.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI) (retrievalmarket.RetrievalClient, error) {
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h)
sc := storedcounter.New(ds, datastore.NewKey("/retr"))
@ -143,6 +145,7 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
if err != nil {
return nil, err
}
client.OnReady(marketevents.ReadyLogger("retrieval client"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
@ -150,7 +153,7 @@ func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore,
evtType := journal.J.RegisterEventType("markets/retrieval/client", "state_change")
client.SubscribeToEvents(markets.RetrievalClientJournaler(evtType))
return nil
return client.Start(ctx)
},
})
return client, nil

View File

@ -13,8 +13,9 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/go-fil-markets/discovery"
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
@ -26,6 +27,7 @@ import (
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/peermgr"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
@ -117,12 +119,22 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
return discovery.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {
local, err := discoveryimpl.NewLocal(namespace.Wrap(ds, datastore.NewKey("/deals/local")))
if err != nil {
return nil, err
}
local.OnReady(marketevents.ReadyLogger("discovery"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return local.Start(ctx)
},
})
return local, nil
}
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
return discovery.Multi(l)
func RetrievalResolver(l *discoveryimpl.Local) discovery.PeerResolver {
return discoveryimpl.Multi(l)
}
type RandomBeaconParams struct {

View File

@ -29,10 +29,11 @@ import (
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
@ -216,14 +217,16 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
}
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
OnStart: func(ctx context.Context) error {
m.SubscribeToEvents(marketevents.RetrievalProviderLogger)
evtType := journal.J.RegisterEventType("markets/retrieval/provider", "state_change")
m.SubscribeToEvents(markets.RetrievalProviderJournaler(evtType))
return m.Start()
return m.Start(ctx)
},
OnStop: func(context.Context) error {
return m.Stop()
@ -233,7 +236,7 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider) {
ctx := helpers.LifecycleCtx(mctx, lc)
h.OnReady(marketevents.ReadyLogger("storage provider"))
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
h.SubscribeToEvents(marketevents.StorageProviderLogger)
@ -275,8 +278,18 @@ func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.S
// NewProviderPieceStore creates a statestore for storing metadata about pieces
// shared by the storage and retrieval providers
func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore {
return piecestore.NewPieceStore(namespace.Wrap(ds, datastore.NewKey("/storagemarket")))
func NewProviderPieceStore(lc fx.Lifecycle, ds dtypes.MetadataDS) (dtypes.ProviderPieceStore, error) {
ps, err := piecestoreimpl.NewPieceStore(namespace.Wrap(ds, datastore.NewKey("/storagemarket")))
if err != nil {
return nil, err
}
ps.OnReady(marketevents.ReadyLogger("piecestore"))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return ps.Start(ctx)
},
})
return ps, nil
}
func StagingMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.StagingMultiDstore, error) {
@ -371,7 +384,13 @@ func NewStorageAsk(ctx helpers.MetricsCtx, fapi lapi.FullNode, ds dtypes.Metadat
return nil, err
}
storedAsk, err := storedask.NewStoredAsk(namespace.Wrap(ds, datastore.NewKey("/deals/provider")), datastore.NewKey("latest-ask"), spn, address.Address(minerAddress))
providerDs := namespace.Wrap(ds, datastore.NewKey("/deals/provider"))
// legacy this was mistake where this key was place -- so we move the legacy key if need be
err = shared.MoveKey(providerDs, "/latest-ask", "/storage-ask/latest")
if err != nil {
return nil, err
}
storedAsk, err := storedask.NewStoredAsk(namespace.Wrap(providerDs, datastore.NewKey("/storage-ask")), datastore.NewKey("latest"), spn, address.Address(minerAddress))
if err != nil {
return nil, err
}