Merge branch 'next' into feat/genesis-rootkey

This commit is contained in:
Jose Pablo Fernandez 2020-07-28 11:53:00 +02:00 committed by GitHub
commit 834ce171d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 503 additions and 442 deletions

View File

@ -57,7 +57,7 @@ type FullNode interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]Message, error) ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]Message, error)
// ChainGetTipSetByHeight looks back for a tipset at the specified epoch. // ChainGetTipSetByHeight looks back for a tipset at the specified epoch.
// If there are no blocks at the specified epoch, a tipset at higher epoch // If there are no blocks at the specified epoch, a tipset at an earlier epoch
// will be returned. // will be returned.
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)

View File

@ -30,7 +30,7 @@ type StorageMiner interface {
PledgeSector(context.Context) error PledgeSector(context.Context) error
// Get the status of a given sector by ID // Get the status of a given sector by ID
SectorsStatus(context.Context, abi.SectorNumber) (SectorInfo, error) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error)
// List all staged sectors // List all staged sectors
SectorsList(context.Context) ([]abi.SectorNumber, error) SectorsList(context.Context) ([]abi.SectorNumber, error)
@ -63,6 +63,9 @@ type StorageMiner interface {
WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error) WorkerStats(context.Context) (map[uint64]storiface.WorkerStats, error)
WorkerJobs(context.Context) (map[uint64][]storiface.WorkerJob, error) WorkerJobs(context.Context) (map[uint64][]storiface.WorkerJob, error)
// SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(context.Context) (interface{}, error)
stores.SectorIndex stores.SectorIndex
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
@ -117,11 +120,24 @@ type SectorInfo struct {
LastErr string LastErr string
Log []SectorLog Log []SectorLog
// On Chain Info
SealProof abi.RegisteredSealProof // The seal proof type implies the PoSt proof/s
Activation abi.ChainEpoch // Epoch during which the sector proof was accepted
Expiration abi.ChainEpoch // Epoch during which the sector expires
DealWeight abi.DealWeight // Integral of active deals over sector lifetime
VerifiedDealWeight abi.DealWeight // Integral of active verified deals over sector lifetime
InitialPledge abi.TokenAmount // Pledge collected to commit this sector
// Expiration Info
OnTime abi.ChainEpoch
// non-zero if sector is faulty, epoch at which it will be permanently
// removed if it doesn't recover
Early abi.ChainEpoch
} }
type SealedRef struct { type SealedRef struct {
SectorID abi.SectorNumber SectorID abi.SectorNumber
Offset uint64 Offset abi.PaddedPieceSize
Size abi.UnpaddedPieceSize Size abi.UnpaddedPieceSize
} }

View File

@ -215,22 +215,24 @@ type StorageMinerStruct struct {
PledgeSector func(context.Context) error `perm:"write"` PledgeSector func(context.Context) error `perm:"write"`
SectorsStatus func(context.Context, abi.SectorNumber) (api.SectorInfo, error) `perm:"read"` SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"` SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"` SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"` SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"` SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"` SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorSetExpectedSealDuration func(context.Context, time.Duration) error `perm:"write"` SectorSetExpectedSealDuration func(context.Context, time.Duration) error `perm:"write"`
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"` SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"` WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[uint64][]storiface.WorkerJob, error) `perm:"admin"` WorkerJobs func(context.Context) (map[uint64][]storiface.WorkerJob, error) `perm:"admin"`
SealingSchedDiag func(context.Context) (interface{}, error) `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"` StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"` StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"`
@ -845,8 +847,8 @@ func (c *StorageMinerStruct) PledgeSector(ctx context.Context) error {
} }
// Get the status of a given sector by ID // Get the status of a given sector by ID
func (c *StorageMinerStruct) SectorsStatus(ctx context.Context, sid abi.SectorNumber) (api.SectorInfo, error) { func (c *StorageMinerStruct) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
return c.Internal.SectorsStatus(ctx, sid) return c.Internal.SectorsStatus(ctx, sid, showOnChainInfo)
} }
// List all staged sectors // List all staged sectors
@ -902,6 +904,10 @@ func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uint64][]stori
return c.Internal.WorkerJobs(ctx) return c.Internal.WorkerJobs(ctx)
} }
func (c *StorageMinerStruct) SealingSchedDiag(ctx context.Context) (interface{}, error) {
return c.Internal.SealingSchedDiag(ctx)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error { func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error {
return c.Internal.StorageAttach(ctx, si, st) return c.Internal.StorageAttach(ctx, si, st)
} }

View File

@ -317,7 +317,7 @@ func (t *SealedRef) UnmarshalCBOR(r io.Reader) error {
if maj != cbg.MajUnsignedInt { if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field") return fmt.Errorf("wrong type for uint64 field")
} }
t.Offset = uint64(extra) t.Offset = abi.PaddedPieceSize(extra)
} }
// t.Size (abi.UnpaddedPieceSize) (uint64) // t.Size (abi.UnpaddedPieceSize) (uint64)

View File

@ -129,7 +129,7 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal) waitDealSealed(t, ctx, miner, client, deal, false)
// Retrieval // Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal) info, err := client.ClientGetDealInfo(ctx, *deal)
@ -138,6 +138,82 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data) testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data)
} }
func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
_ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background()
n, sn := b(t, 1, oneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
}
}()
{
data1 := make([]byte, 800)
rand.New(rand.NewSource(int64(3))).Read(data1)
r := bytes.NewReader(data1)
fcid1, err := client.ClientImportLocal(ctx, r)
if err != nil {
t.Fatal(err)
}
data2 := make([]byte, 800)
rand.New(rand.NewSource(int64(9))).Read(data2)
r2 := bytes.NewReader(data2)
fcid2, err := client.ClientImportLocal(ctx, r2)
if err != nil {
t.Fatal(err)
}
deal1 := startDeal(t, ctx, miner, client, fcid1, true)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal1, true)
deal2 := startDeal(t, ctx, miner, client, fcid2, true)
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal2, false)
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal2)
require.NoError(t, err)
rf, _ := miner.SectorsRefs(ctx)
fmt.Printf("refs: %+v\n", rf)
testRetrieval(t, ctx, err, client, fcid2, &info.PieceCID, false, data2)
}
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid { func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid {
maddr, err := miner.ActorAddress(ctx) maddr, err := miner.ActorAddress(ctx)
if err != nil { if err != nil {
@ -162,7 +238,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
return deal return deal
} }
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid) { func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid, noseal bool) {
loop: loop:
for { for {
di, err := client.ClientGetDealInfo(ctx, *deal) di, err := client.ClientGetDealInfo(ctx, *deal)
@ -171,6 +247,9 @@ loop:
} }
switch di.State { switch di.State {
case storagemarket.StorageDealSealing: case storagemarket.StorageDealSealing:
if noseal {
return
}
startSealingWaiting(t, ctx, miner) startSealingWaiting(t, ctx, miner)
case storagemarket.StorageDealProposalRejected: case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected") t.Fatal("deal rejected")
@ -192,7 +271,7 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod
require.NoError(t, err) require.NoError(t, err)
for _, snum := range snums { for _, snum := range snums {
si, err := miner.SectorsStatus(ctx, snum) si, err := miner.SectorsStatus(ctx, snum, false)
require.NoError(t, err) require.NoError(t, err)
t.Logf("Sector state: %s", si.State) t.Logf("Sector state: %s", si.State)

View File

@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
waitDealSealed(t, ctx, provider, client, deal) waitDealSealed(t, ctx, provider, client, deal, false)
<-minedTwo <-minedTwo

View File

@ -97,7 +97,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
for len(toCheck) > 0 { for len(toCheck) > 0 {
for n := range toCheck { for n := range toCheck {
st, err := miner.SectorsStatus(ctx, n) st, err := miner.SectorsStatus(ctx, n, false)
require.NoError(t, err) require.NoError(t, err)
if st.State == api.SectorState(sealing.Proving) { if st.State == api.SectorState(sealing.Proving) {
delete(toCheck, n) delete(toCheck, n)
@ -233,7 +233,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
require.Equal(t, p.MinerPower, p.TotalPower) require.Equal(t, p.MinerPower, p.TotalPower)
sectors := p.MinerPower.RawBytePower.Uint64() / uint64(ssz) sectors := p.MinerPower.RawBytePower.Uint64() / uint64(ssz)
require.Equal(t, nSectors+GenesisPreseals - 3, int(sectors)) // -3 just removed sectors require.Equal(t, nSectors+GenesisPreseals-3, int(sectors)) // -3 just removed sectors
mine = false mine = false
<-done <-done

View File

@ -320,7 +320,16 @@ func createAccount(ctx context.Context, bs bstore.Blockstore, cst cbor.IpldStore
func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot cid.Cid, template genesis.Template) (cid.Cid, error) { func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot cid.Cid, template genesis.Template) (cid.Cid, error) {
verifNeeds := make(map[address.Address]abi.PaddedPieceSize) verifNeeds := make(map[address.Address]abi.PaddedPieceSize)
var sum abi.PaddedPieceSize var sum abi.PaddedPieceSize
vm, err := vm.NewVM(stateroot, 0, &fakeRand{}, cs.Blockstore(), mkFakedSigSyscalls(cs.VMSys()))
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create NewVM: %w", err)
}
for _, m := range template.Miners { for _, m := range template.Miners {
// Add the miner to the market actor's balance table
_, err = doExec(ctx, vm, builtin.StorageMarketActorAddr, m.Owner, builtin.MethodsMarket.AddBalance, mustEnc(adt.Empty))
for _, s := range m.Sectors { for _, s := range m.Sectors {
amt := s.Deal.PieceSize amt := s.Deal.PieceSize
verifNeeds[s.Deal.Client] += amt verifNeeds[s.Deal.Client] += amt
@ -344,6 +353,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
} }
_, err = doExecValue(ctx, vm, builtin.VerifiedRegistryActorAddr, verifregRoot, types.NewInt(0), builtin.MethodsVerifiedRegistry.AddVerifier, mustEnc(&verifreg.AddVerifierParams{ _, err = doExecValue(ctx, vm, builtin.VerifiedRegistryActorAddr, verifregRoot, types.NewInt(0), builtin.MethodsVerifiedRegistry.AddVerifier, mustEnc(&verifreg.AddVerifierParams{
Address: verifier, Address: verifier,
Allowance: abi.NewStoragePower(int64(sum)), // eh, close enough Allowance: abi.NewStoragePower(int64(sum)), // eh, close enough

View File

@ -56,17 +56,17 @@ var clientCmd = &cli.Command{
Name: "client", Name: "client",
Usage: "Make deals, store data, retrieve data", Usage: "Make deals, store data, retrieve data",
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
clientImportCmd, WithCategory("storage", clientDealCmd),
clientDropCmd, WithCategory("storage", clientQueryAskCmd),
clientCommPCmd, WithCategory("storage", clientListDeals),
clientLocalCmd, WithCategory("storage", clientGetDealCmd),
clientDealCmd, WithCategory("data", clientImportCmd),
clientFindCmd, WithCategory("data", clientDropCmd),
clientRetrieveCmd, WithCategory("data", clientLocalCmd),
clientQueryAskCmd, WithCategory("retrieval", clientFindCmd),
clientListDeals, WithCategory("retrieval", clientRetrieveCmd),
clientCarGenCmd, WithCategory("util", clientCommPCmd),
clientGetDealCmd, WithCategory("util", clientCarGenCmd),
}, },
} }
@ -164,7 +164,7 @@ var clientDropCmd = &cli.Command{
var clientCommPCmd = &cli.Command{ var clientCommPCmd = &cli.Command{
Name: "commP", Name: "commP",
Usage: "calculate the piece-cid (commP) of a CAR file", Usage: "Calculate the piece-cid (commP) of a CAR file",
ArgsUsage: "[inputFile minerAddress]", ArgsUsage: "[inputFile minerAddress]",
Flags: []cli.Flag{ Flags: []cli.Flag{
&CidBaseFlag, &CidBaseFlag,
@ -204,7 +204,7 @@ var clientCommPCmd = &cli.Command{
var clientCarGenCmd = &cli.Command{ var clientCarGenCmd = &cli.Command{
Name: "generate-car", Name: "generate-car",
Usage: "generate a car file from input", Usage: "Generate a car file from input",
ArgsUsage: "[inputPath outputPath]", ArgsUsage: "[inputPath outputPath]",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx) api, closer, err := GetFullNodeAPI(cctx)
@ -429,7 +429,7 @@ var clientDealCmd = &cli.Command{
var clientFindCmd = &cli.Command{ var clientFindCmd = &cli.Command{
Name: "find", Name: "find",
Usage: "find data in the network", Usage: "Find data in the network",
ArgsUsage: "[dataCid]", ArgsUsage: "[dataCid]",
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.StringFlag{ &cli.StringFlag{
@ -496,7 +496,7 @@ const DefaultMaxRetrievePrice = 1
var clientRetrieveCmd = &cli.Command{ var clientRetrieveCmd = &cli.Command{
Name: "retrieve", Name: "retrieve",
Usage: "retrieve data from network", Usage: "Retrieve data from network",
ArgsUsage: "[dataCid outputPath]", ArgsUsage: "[dataCid outputPath]",
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.StringFlag{ &cli.StringFlag{
@ -642,7 +642,7 @@ var clientRetrieveCmd = &cli.Command{
var clientQueryAskCmd = &cli.Command{ var clientQueryAskCmd = &cli.Command{
Name: "query-ask", Name: "query-ask",
Usage: "find a miners ask", Usage: "Find a miners ask",
ArgsUsage: "[minerAddress]", ArgsUsage: "[minerAddress]",
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.StringFlag{ &cli.StringFlag{

View File

@ -225,10 +225,19 @@ var mpoolStat = &cli.Command{
return out[i].addr < out[j].addr return out[i].addr < out[j].addr
}) })
var total mpStat
for _, stat := range out { for _, stat := range out {
fmt.Printf("%s, past: %d, cur: %d, future: %d\n", stat.addr, stat.past, stat.cur, stat.future) total.past += stat.past
total.cur += stat.cur
total.future += stat.future
fmt.Printf("%s: past: %d, cur: %d, future: %d\n", stat.addr, stat.past, stat.cur, stat.future)
} }
fmt.Println("-----")
fmt.Printf("total: past: %d, cur: %d, future: %d\n", total.past, total.cur, total.future)
return nil return nil
}, },
} }

View File

@ -156,7 +156,12 @@ var msigInspectCmd = &cli.Command{
Name: "inspect", Name: "inspect",
Usage: "Inspect a multisig wallet", Usage: "Inspect a multisig wallet",
ArgsUsage: "[address]", ArgsUsage: "[address]",
Flags: []cli.Flag{}, Flags: []cli.Flag{
&cli.BoolFlag{
Name: "vesting",
Usage: "Include vesting details",
},
},
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() { if !cctx.Args().Present() {
return ShowHelp(cctx, fmt.Errorf("must specify address of multisig to inspect")) return ShowHelp(cctx, fmt.Errorf("must specify address of multisig to inspect"))
@ -197,6 +202,13 @@ var msigInspectCmd = &cli.Command{
locked := mstate.AmountLocked(head.Height() - mstate.StartEpoch) locked := mstate.AmountLocked(head.Height() - mstate.StartEpoch)
fmt.Printf("Balance: %s\n", types.FIL(act.Balance)) fmt.Printf("Balance: %s\n", types.FIL(act.Balance))
fmt.Printf("Spendable: %s\n", types.FIL(types.BigSub(act.Balance, locked))) fmt.Printf("Spendable: %s\n", types.FIL(types.BigSub(act.Balance, locked)))
if cctx.Bool("vesting") {
fmt.Printf("InitialBalance: %s\n", types.FIL(mstate.InitialBalance))
fmt.Printf("StartEpoch: %d\n", mstate.StartEpoch)
fmt.Printf("UnlockDuration: %d\n", mstate.UnlockDuration)
}
fmt.Printf("Threshold: %d / %d\n", mstate.NumApprovalsThreshold, len(mstate.Signers)) fmt.Printf("Threshold: %d / %d\n", mstate.NumApprovalsThreshold, len(mstate.Signers))
fmt.Println("Signers:") fmt.Println("Signers:")
for _, s := range mstate.Signers { for _, s := range mstate.Signers {

View File

@ -45,7 +45,7 @@ var paychGetCmd = &cli.Command{
return ShowHelp(cctx, fmt.Errorf("failed to parse to address: %s", err)) return ShowHelp(cctx, fmt.Errorf("failed to parse to address: %s", err))
} }
amt, err := types.BigFromString(cctx.Args().Get(2)) amt, err := types.ParseFIL(cctx.Args().Get(2))
if err != nil { if err != nil {
return ShowHelp(cctx, fmt.Errorf("parsing amount failed: %s", err)) return ShowHelp(cctx, fmt.Errorf("parsing amount failed: %s", err))
} }
@ -58,7 +58,7 @@ var paychGetCmd = &cli.Command{
ctx := ReqContext(cctx) ctx := ReqContext(cctx)
info, err := api.PaychGet(ctx, from, to, amt) info, err := api.PaychGet(ctx, from, to, types.BigInt(amt))
if err != nil { if err != nil {
return err return err
} }

View File

@ -119,7 +119,7 @@ func (p *Processor) Start(ctx context.Context) {
// TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where // TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where
// before doing "normal" processing. // before doing "normal" processing.
actorChanges, err := p.collectActorChanges(ctx, toProcess) actorChanges, nullRounds, err := p.collectActorChanges(ctx, toProcess)
if err != nil { if err != nil {
log.Fatalw("Failed to collect actor changes", "error", err) log.Fatalw("Failed to collect actor changes", "error", err)
} }
@ -141,7 +141,7 @@ func (p *Processor) Start(ctx context.Context) {
}) })
grp.Go(func() error { grp.Go(func() error {
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID]); err != nil { if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
return xerrors.Errorf("Failed to handle reward changes: %w", err) return xerrors.Errorf("Failed to handle reward changes: %w", err)
} }
return nil return nil
@ -191,7 +191,7 @@ func (p *Processor) refreshViews() error {
return nil return nil
} }
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) { func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, []types.TipSetKey, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
log.Debugw("Collected Actor Changes", "duration", time.Since(start).String()) log.Debugw("Collected Actor Changes", "duration", time.Since(start).String())
@ -204,6 +204,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
var changes map[string]types.Actor var changes map[string]types.Actor
actorsSeen := map[cid.Cid]struct{}{} actorsSeen := map[cid.Cid]struct{}{}
var nullRounds []types.TipSetKey
var nullBlkMu sync.Mutex
// collect all actor state that has changes between block headers // collect all actor state that has changes between block headers
paDone := 0 paDone := 0
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) { parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
@ -217,6 +220,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
panic(err) panic(err)
} }
if pts.ParentState().Equals(bh.ParentStateRoot) {
nullBlkMu.Lock()
nullRounds = append(nullRounds, pts.Key())
nullBlkMu.Unlock()
}
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state. // collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
// TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider // TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider
// a separate strategy for deleted actors // a separate strategy for deleted actors
@ -232,12 +241,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
addr, err := address.NewFromString(a) addr, err := address.NewFromString(a)
if err != nil { if err != nil {
panic(err) log.Fatal(err.Error())
} }
ast, err := p.node.StateReadState(ctx, addr, pts.Key()) ast, err := p.node.StateReadState(ctx, addr, pts.Key())
if err != nil { if err != nil {
panic(err) log.Fatal(err.Error())
} }
// TODO look here for an empty state, maybe thats a sign the actor was deleted? // TODO look here for an empty state, maybe thats a sign the actor was deleted?
@ -267,7 +276,7 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
outMu.Unlock() outMu.Unlock()
} }
}) })
return out, nil return out, nullRounds, nil
} }
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) { func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {

View File

@ -8,10 +8,12 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
) )
type rewardActorInfo struct { type rewardActorInfo struct {
@ -51,34 +53,15 @@ create table if not exists chain_power
primary key, primary key,
baseline_power text not null baseline_power text not null
); );
create materialized view if not exists top_miners_by_base_reward as
with total_rewards_by_miner as (
select
b.miner,
sum(bbr.base_block_reward) as total_reward
from blocks b
inner join base_block_rewards bbr on b.parentstateroot = bbr.state_root
group by 1
) select
rank() over (order by total_reward desc),
miner,
total_reward
from total_rewards_by_miner
group by 2, 3;
create index if not exists top_miners_by_base_reward_miner_index
on top_miners_by_base_reward (miner);
`); err != nil { `); err != nil {
return err return err
} }
return tx.Commit() return tx.Commit()
} }
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips) error { func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
rewardChanges, err := p.processRewardActors(ctx, rewardTips) rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
if err != nil { if err != nil {
log.Fatalw("Failed to process reward actors", "error", err) log.Fatalw("Failed to process reward actors", "error", err)
} }
@ -90,7 +73,7 @@ func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTip
return nil return nil
} }
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) { func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String()) log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
@ -123,6 +106,37 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
out = append(out, rw) out = append(out, rw)
} }
} }
for _, tsKey := range nullRounds {
var rw rewardActorInfo
tipset , err := p.node.ChainGetTipSet(ctx, tsKey)
if err != nil {
return nil, err
}
rw.common.tsKey = tipset.Key()
rw.common.height = tipset.Height()
rw.common.stateroot = tipset.ParentState()
rw.common.parentTsKey = tipset.Parents()
// get reward actor states at each tipset once for all updates
rewardActor, err := p.node.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
if err != nil {
return nil, err
}
rewardStateRaw, err := p.node.ChainReadObj(ctx, rewardActor.Head)
if err != nil {
return nil, err
}
var rewardActorState reward.State
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
return nil, err
}
rw.baseBlockReward = rewardActorState.ThisEpochReward
rw.baselinePower = rewardActorState.ThisEpochBaselinePower
out = append(out, rw)
}
return out, nil return out, nil
} }

View File

@ -8,6 +8,56 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error {
select {
case <-ctx.Done():
return nil
default:
}
tx, err := db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create materialized view if not exists top_miners_by_base_reward as
with total_rewards_by_miner as (
select
b.miner,
sum(bbr.base_block_reward * b.win_count) as total_reward
from blocks b
inner join base_block_rewards bbr on b.parentstateroot = bbr.state_root
group by 1
) select
rank() over (order by total_reward desc),
miner,
total_reward
from total_rewards_by_miner
group by 2, 3;
create index if not exists top_miners_by_base_reward_miner_index
on top_miners_by_base_reward (miner);
create materialized view if not exists top_miners_by_base_reward_max_height as
select
b."timestamp"as current_timestamp,
max(b.height) as current_height
from blocks b
join base_block_rewards bbr on b.parentstateroot = bbr.state_root
where bbr.base_block_reward is not null
group by 1
order by 1 desc
limit 1;
`); err != nil {
return xerrors.Errorf("create top_miner_by_base_reward views", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commiting top_miner_by_base_reward views", err)
}
return nil
}
func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error { func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -20,10 +70,15 @@ func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error {
log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String()) log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String())
}() }()
_, err := db.Exec("REFRESH MATERIALIZED VIEW top_miners_by_base_reward;") _, err := db.Exec("refresh materialized view top_miners_by_base_reward;")
if err != nil { if err != nil {
return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err) return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err)
} }
_, err = db.Exec("refresh materialized view top_miners_by_base_reward_max_height;")
if err != nil {
return xerrors.Errorf("refresh top_miners_by_base_reward_max_height: %w", err)
}
return nil return nil
} }

View File

@ -6,6 +6,8 @@ import (
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
) )
var log = logging.Logger("scheduler") var log = logging.Logger("scheduler")
@ -21,10 +23,21 @@ func PrepareScheduler(db *sql.DB) *Scheduler {
return &Scheduler{db} return &Scheduler{db}
} }
func (s *Scheduler) setupSchema(ctx context.Context) error {
if err := setupTopMinerByBaseRewardSchema(ctx, s.db); err != nil {
return xerrors.Errorf("setup top miners by reward schema", err)
}
return nil
}
// Start the scheduler jobs at the defined intervals // Start the scheduler jobs at the defined intervals
func (s *Scheduler) Start(ctx context.Context) { func (s *Scheduler) Start(ctx context.Context) {
log.Debug("Starting Scheduler") log.Debug("Starting Scheduler")
if err := s.setupSchema(ctx); err != nil {
log.Fatalw("applying scheduling schema", err)
}
go func() { go func() {
// run once on start after schema has initialized // run once on start after schema has initialized
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)

View File

@ -95,7 +95,7 @@ var runCmd = &cli.Command{
&cli.StringFlag{ &cli.StringFlag{
Name: "address", Name: "address",
Usage: "locally reachable address", Usage: "locally reachable address",
Value: "0.0.0.0", Value: "0.0.0.0:3456",
}, },
&cli.BoolFlag{ &cli.BoolFlag{
Name: "no-local-storage", Name: "no-local-storage",
@ -116,6 +116,11 @@ var runCmd = &cli.Command{
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)", Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true, Value: true,
}, },
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
},
&cli.StringFlag{ &cli.StringFlag{
Name: "timeout", Name: "timeout",
Usage: "used when address is unspecified. must be a valid duration recognized by golang's time.ParseDuration function", Usage: "used when address is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
@ -295,7 +300,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("could not get api info: %w", err) return xerrors.Errorf("could not get api info: %w", err)
} }
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader()) remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
// Create / expose the worker // Create / expose the worker

View File

@ -24,9 +24,6 @@ import (
var infoCmd = &cli.Command{ var infoCmd = &cli.Command{
Name: "info", Name: "info",
Usage: "Print miner info", Usage: "Print miner info",
Flags: []cli.Flag{
&cli.BoolFlag{Name: "color"},
},
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
color.NoColor = !cctx.Bool("color") color.NoColor = !cctx.Bool("color")
@ -225,7 +222,7 @@ func sectorsInfo(ctx context.Context, napi api.StorageMiner) error {
"Total": len(sectors), "Total": len(sectors),
} }
for _, s := range sectors { for _, s := range sectors {
st, err := napi.SectorsStatus(ctx, s) st, err := napi.SectorsStatus(ctx, s, false)
if err != nil { if err != nil {
return err return err
} }

View File

@ -436,7 +436,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{ smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{
SealProofType: spt, SealProofType: spt,
}, sectorstorage.SealerConfig{true, true, true, true}, nil, sa) }, sectorstorage.SealerConfig{10, true, true, true, true}, nil, sa)
if err != nil { if err != nil {
return err return err
} }

View File

@ -75,6 +75,9 @@ func main() {
Usage: "specify other actor to check state for (read only)", Usage: "specify other actor to check state for (read only)",
Aliases: []string{"a"}, Aliases: []string{"a"},
}, },
&cli.BoolFlag{
Name: "color",
},
&cli.StringFlag{ &cli.StringFlag{
Name: "repo", Name: "repo",
EnvVars: []string{"LOTUS_PATH"}, EnvVars: []string{"LOTUS_PATH"},
@ -104,6 +107,7 @@ func getActorAddress(ctx context.Context, nodeAPI api.StorageMiner, overrideMadd
if err != nil { if err != nil {
return maddr, err return maddr, err
} }
return
} }
maddr, err = nodeAPI.ActorAddress(ctx) maddr, err = nodeAPI.ActorAddress(ctx)

View File

@ -7,6 +7,7 @@ import (
"text/tabwriter" "text/tabwriter"
"time" "time"
"github.com/fatih/color"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -32,6 +33,8 @@ var provingFaultsCmd = &cli.Command{
Name: "faults", Name: "faults",
Usage: "View the currently known proving faulty sectors information", Usage: "View the currently known proving faulty sectors information",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
color.NoColor = !cctx.Bool("color")
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil { if err != nil {
return err return err
@ -66,6 +69,8 @@ var provingFaultsCmd = &cli.Command{
} }
} }
fmt.Printf("Miner: %s\n", color.BlueString("%s", maddr))
head, err := api.ChainHead(ctx) head, err := api.ChainHead(ctx)
if err != nil { if err != nil {
return xerrors.Errorf("getting chain head: %w", err) return xerrors.Errorf("getting chain head: %w", err)
@ -101,6 +106,8 @@ var provingInfoCmd = &cli.Command{
Name: "info", Name: "info",
Usage: "View current state information", Usage: "View current state information",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
color.NoColor = !cctx.Bool("color")
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil { if err != nil {
return err return err
@ -135,6 +142,8 @@ var provingInfoCmd = &cli.Command{
return xerrors.Errorf("getting miner deadlines: %w", err) return xerrors.Errorf("getting miner deadlines: %w", err)
} }
fmt.Printf("Miner: %s\n", color.BlueString("%s", maddr))
var mas miner.State var mas miner.State
{ {
mact, err := api.StateGetActor(ctx, maddr, types.EmptyTSK) mact, err := api.StateGetActor(ctx, maddr, types.EmptyTSK)
@ -240,6 +249,8 @@ var provingDeadlinesCmd = &cli.Command{
Name: "deadlines", Name: "deadlines",
Usage: "View the current proving period deadlines information", Usage: "View the current proving period deadlines information",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
color.NoColor = !cctx.Bool("color")
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil { if err != nil {
return err return err
@ -284,6 +295,8 @@ var provingDeadlinesCmd = &cli.Command{
} }
} }
fmt.Printf("Miner: %s\n", color.BlueString("%s", maddr))
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintln(tw, "deadline\tpartitions\tsectors\tproven") _, _ = fmt.Fprintln(tw, "deadline\tpartitions\tsectors\tproven")

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"os" "os"
@ -24,6 +25,7 @@ var sealingCmd = &cli.Command{
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
sealingJobsCmd, sealingJobsCmd,
sealingWorkersCmd, sealingWorkersCmd,
sealingSchedDiagCmd,
}, },
} }
@ -176,3 +178,31 @@ var sealingJobsCmd = &cli.Command{
return tw.Flush() return tw.Flush()
}, },
} }
var sealingSchedDiagCmd = &cli.Command{
Name: "sched-diag",
Usage: "Dump internal scheduler state",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
st, err := nodeApi.SealingSchedDiag(ctx)
if err != nil {
return err
}
j, err := json.MarshalIndent(&st, "", " ")
if err != nil {
return err
}
fmt.Println(string(j))
return nil
},
}

View File

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"os" "os"
"sort" "sort"
"strconv" "strconv"
@ -31,6 +32,7 @@ var sectorsCmd = &cli.Command{
sectorsMarkForUpgradeCmd, sectorsMarkForUpgradeCmd,
sectorsStartSealCmd, sectorsStartSealCmd,
sectorsSealDelayCmd, sectorsSealDelayCmd,
sectorsCapacityCollateralCmd,
}, },
} }
@ -58,6 +60,10 @@ var sectorsStatusCmd = &cli.Command{
Name: "log", Name: "log",
Usage: "display event log", Usage: "display event log",
}, },
&cli.BoolFlag{
Name: "on-chain-info",
Usage: "show sector on chain info",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
@ -76,7 +82,8 @@ var sectorsStatusCmd = &cli.Command{
return err return err
} }
status, err := nodeApi.SectorsStatus(ctx, abi.SectorNumber(id)) onChainInfo := cctx.Bool("on-chain-info")
status, err := nodeApi.SectorsStatus(ctx, abi.SectorNumber(id), onChainInfo)
if err != nil { if err != nil {
return err return err
} }
@ -96,6 +103,19 @@ var sectorsStatusCmd = &cli.Command{
fmt.Printf("Last Error:\t\t%s\n", status.LastErr) fmt.Printf("Last Error:\t\t%s\n", status.LastErr)
} }
if onChainInfo {
fmt.Printf("\nSector On Chain Info\n")
fmt.Printf("SealProof:\t\t%x\n", status.SealProof)
fmt.Printf("Activation:\t\t%v\n", status.Activation)
fmt.Printf("Expiration:\t\t%v\n", status.Expiration)
fmt.Printf("DealWeight:\t\t%v\n", status.DealWeight)
fmt.Printf("VerifiedDealWeight:\t\t%v\n", status.VerifiedDealWeight)
fmt.Printf("InitialPledge:\t\t%v\n", status.InitialPledge)
fmt.Printf("\nExpiration Info\n")
fmt.Printf("OnTime:\t\t%v\n", status.OnTime)
fmt.Printf("Early:\t\t%v\n", status.Early)
}
if cctx.Bool("log") { if cctx.Bool("log") {
fmt.Printf("--------\nEvent Log:\n") fmt.Printf("--------\nEvent Log:\n")
@ -163,7 +183,7 @@ var sectorsListCmd = &cli.Command{
w := tabwriter.NewWriter(os.Stdout, 8, 4, 1, ' ', 0) w := tabwriter.NewWriter(os.Stdout, 8, 4, 1, ' ', 0)
for _, s := range list { for _, s := range list {
st, err := nodeApi.SectorsStatus(ctx, s) st, err := nodeApi.SectorsStatus(ctx, s, false)
if err != nil { if err != nil {
fmt.Fprintf(w, "%d:\tError: %s\n", s, err) fmt.Fprintf(w, "%d:\tError: %s\n", s, err)
continue continue
@ -321,6 +341,53 @@ var sectorsSealDelayCmd = &cli.Command{
}, },
} }
var sectorsCapacityCollateralCmd = &cli.Command{
Name: "get-cc-collateral",
Usage: "Get the collateral required to pledge a committed capacity sector",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "expiration",
Usage: "the epoch when the sector will expire",
},
},
Action: func(cctx *cli.Context) error {
mApi, mCloser, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer mCloser()
nApi, nCloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer nCloser()
ctx := lcli.ReqContext(cctx)
maddr, err := mApi.ActorAddress(ctx)
if err != nil {
return err
}
pci := miner.SectorPreCommitInfo{
Expiration: abi.ChainEpoch(cctx.Uint64("expiration")),
}
if pci.Expiration == 0 {
pci.Expiration = miner.MaxSectorExpirationExtension
}
pc, err := nApi.StateMinerInitialPledgeCollateral(ctx, maddr, pci, types.EmptyTSK)
if err != nil {
return err
}
fmt.Printf("Estimated collateral: %s\n", types.FIL(pc))
return nil
},
}
var sectorsUpdateCmd = &cli.Command{ var sectorsUpdateCmd = &cli.Command{
Name: "update-state", Name: "update-state",
Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery", Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery",

3
go.mod
View File

@ -25,10 +25,11 @@ require (
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v0.5.1 github.com/filecoin-project/go-fil-markets v0.5.1
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-multistore v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/sector-storage v0.0.0-20200723200950-ed2e57dde6df github.com/filecoin-project/sector-storage v0.0.0-20200727112136-9377cb376d25
github.com/filecoin-project/specs-actors v0.8.1-0.20200724015154-3c690d9b7e1d github.com/filecoin-project/specs-actors v0.8.1-0.20200724015154-3c690d9b7e1d
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/filecoin-project/storage-fsm v0.0.0-20200720190000-2cfe2fe3c334 github.com/filecoin-project/storage-fsm v0.0.0-20200720190000-2cfe2fe3c334

6
go.sum
View File

@ -248,6 +248,8 @@ github.com/filecoin-project/go-fil-markets v0.5.1 h1:Y69glslNCuXnygfesCmyilTVhEE
github.com/filecoin-project/go-fil-markets v0.5.1/go.mod h1:GKGigsFNMvKmx/+Mcn7093TdZTiCDLc7YGxQ7d6fq2s= github.com/filecoin-project/go-fil-markets v0.5.1/go.mod h1:GKGigsFNMvKmx/+Mcn7093TdZTiCDLc7YGxQ7d6fq2s=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
github.com/filecoin-project/go-multistore v0.0.1 h1:wXCd02azCxEcMNlDE9lksraQO+iIjFGNw01IZyf8GPA=
github.com/filecoin-project/go-multistore v0.0.1/go.mod h1:z8NeSPWubEvrzi0XolhZ1NjTeW9ZDR779M+EDhf4QIQ=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
@ -264,8 +266,8 @@ github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM= github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15 h1:miw6hiusb/MkV1ryoqUKKWnvHhPW00AYtyeCj0L8pqo= github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15 h1:miw6hiusb/MkV1ryoqUKKWnvHhPW00AYtyeCj0L8pqo=
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo= github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo=
github.com/filecoin-project/sector-storage v0.0.0-20200723200950-ed2e57dde6df h1:VDdWrCNUNx6qeHnGU9oAy+izuGM02it9V/5+MJyhZQw= github.com/filecoin-project/sector-storage v0.0.0-20200727112136-9377cb376d25 h1:sTonFkDw3KrIFIJTfIevYXyk+Mu9LbjbOHn/fWoMOMc=
github.com/filecoin-project/sector-storage v0.0.0-20200723200950-ed2e57dde6df/go.mod h1:7EE+f7jM4kCy2MKHoiiwNDQGJSb+QQzZ+y+/17ugq4w= github.com/filecoin-project/sector-storage v0.0.0-20200727112136-9377cb376d25/go.mod h1:f9W29dKqNFm8Su4OddGwkAQOYMKYUR5Fk2oC/JZDjCI=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA= github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=

View File

@ -225,7 +225,7 @@ func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context
if bestSi.State == sealing.UndefinedSectorState { if bestSi.State == sealing.UndefinedSectorState {
return 0, 0, 0, xerrors.New("no sealed sector found") return 0, 0, 0, xerrors.New("no sealed sector found")
} }
return uint64(best.SectorID), best.Offset, uint64(best.Size), nil return uint64(best.SectorID), uint64(best.Offset.Unpadded()), uint64(best.Size), nil
} }
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error { func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {

View File

@ -125,6 +125,10 @@ func DefaultStorageMiner() *StorageMiner {
AllowPreCommit2: true, AllowPreCommit2: true,
AllowCommit: true, AllowCommit: true,
AllowUnseal: true, AllowUnseal: true,
// Default to 10 - tcp should still be able to figure this out, and
// it's the ratio between 10gbit / 1gbit
ParallelFetchLimit: 10,
}, },
Dealmaking: DealmakingConfig{ Dealmaking: DealmakingConfig{

View File

@ -33,6 +33,7 @@ import (
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
@ -556,7 +557,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
return f.Close() return f.Close()
} }
func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *importmgr.Store) (cid.Cid, error) { func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multistore.Store) (cid.Cid, error) {
f, err := os.Open(ref.Path) f, err := os.Open(ref.Path)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err

View File

@ -952,8 +952,12 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr
return types.EmptyInt, err return types.EmptyInt, err
} }
var dealWeights market.VerifyDealsForActivationReturn dealWeights := market.VerifyDealsForActivationReturn{
{ DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
}
if len(pci.DealIDs) != 0 {
var err error var err error
params, err := actors.SerializeParams(&market.VerifyDealsForActivationParams{ params, err := actors.SerializeParams(&market.VerifyDealsForActivationParams{
DealIDs: pci.DealIDs, DealIDs: pci.DealIDs,
@ -980,11 +984,13 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr
} }
} }
ssize, err := pci.SealProof.SectorSize() mi, err := a.StateMinerInfo(ctx, maddr, tsk)
if err != nil { if err != nil {
return types.EmptyInt, err return types.EmptyInt, err
} }
ssize := mi.SectorSize
duration := pci.Expiration - ts.Height() // NB: not exactly accurate, but should always lead us to *over* estimate, not under duration := pci.Expiration - ts.Height() // NB: not exactly accurate, but should always lead us to *over* estimate, not under
circSupply, err := a.StateManager.CirculatingSupply(ctx, ts) circSupply, err := a.StateManager.CirculatingSupply(ctx, ts)

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/filecoin-project/sector-storage/fsutil" "github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"net/http" "net/http"
@ -102,7 +103,7 @@ func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error {
return sm.Miner.PledgeSector() return sm.Miner.PledgeSector()
} }
func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber) (api.SectorInfo, error) { func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
info, err := sm.Miner.GetSectorInfo(sid) info, err := sm.Miner.GetSectorInfo(sid)
if err != nil { if err != nil {
return api.SectorInfo{}, err return api.SectorInfo{}, err
@ -126,7 +127,7 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb
} }
} }
return api.SectorInfo{ sInfo := api.SectorInfo{
SectorID: sid, SectorID: sid,
State: api.SectorState(info.State), State: api.SectorState(info.State),
CommD: info.CommD, CommD: info.CommD,
@ -145,7 +146,40 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb
LastErr: info.LastErr, LastErr: info.LastErr,
Log: log, Log: log,
}, nil // on chain info
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}
if !showOnChainInfo {
return sInfo, nil
}
onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
if err != nil {
return sInfo, nil
}
sInfo.SealProof = onChainInfo.SealProof
sInfo.Activation = onChainInfo.Activation
sInfo.Expiration = onChainInfo.Expiration
sInfo.DealWeight = onChainInfo.DealWeight
sInfo.VerifiedDealWeight = onChainInfo.VerifiedDealWeight
sInfo.InitialPledge = onChainInfo.InitialPledge
ex, err := sm.Full.StateSectorExpiration(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
if err != nil {
return sInfo, nil
}
sInfo.OnTime = ex.OnTime
sInfo.Early = ex.Early
return sInfo, nil
} }
// List all staged sectors // List all staged sectors
@ -229,6 +263,10 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error
return sm.StorageMgr.AddWorker(ctx, w) return sm.StorageMgr.AddWorker(ctx, w)
} }
func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context) (interface{}, error) {
return sm.StorageMgr.SchedDiag(ctx)
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path) fi, err := os.Open(path)
if err != nil { if err != nil {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/lib/bufbstore"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -42,7 +43,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMult
return nil, xerrors.Errorf("getting datastore out of reop: %w", err) return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
} }
mds, err := importmgr.NewMultiDstore(ds) mds, err := multistore.NewMultiDstore(ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -9,6 +9,7 @@ import (
format "github.com/ipfs/go-ipld-format" format "github.com/ipfs/go-ipld-format"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-multistore"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
@ -27,7 +28,7 @@ type ChainGCBlockstore blockstore.GCBlockstore
type ChainExchange exchange.Interface type ChainExchange exchange.Interface
type ChainBlockService bserv.BlockService type ChainBlockService bserv.BlockService
type ClientMultiDstore *importmgr.MultiStore type ClientMultiDstore *multistore.MultiStore
type ClientImportMgr *importmgr.Mgr type ClientImportMgr *importmgr.Mgr
type ClientBlockstore blockstore.Blockstore type ClientBlockstore blockstore.Blockstore
type ClientDealStore *statestore.StateStore type ClientDealStore *statestore.StateStore

View File

@ -382,7 +382,6 @@ func mockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test
} }
} }
for i, def := range storage { for i, def := range storage {
// TODO: support non-bootstrap miners // TODO: support non-bootstrap miners
@ -494,6 +493,8 @@ func TestAPIDealFlowReal(t *testing.T) {
logging.SetLogLevel("sub", "ERROR") logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR") logging.SetLogLevel("storageminer", "ERROR")
saminer.PreCommitChallengeDelay = 5
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, false) test.TestDealFlow(t, builder, time.Second, false, false)
}) })
@ -501,6 +502,10 @@ func TestAPIDealFlowReal(t *testing.T) {
t.Run("fast-retrieval", func(t *testing.T) { t.Run("fast-retrieval", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, true) test.TestDealFlow(t, builder, time.Second, false, true)
}) })
t.Run("retrieval-second", func(t *testing.T) {
test.TestSenondDealRetrieval(t, builder, time.Second)
})
} }
func TestDealMining(t *testing.T) { func TestDealMining(t *testing.T) {

View File

@ -1,95 +0,0 @@
package importmgr
import (
"context"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
type multiReadBs struct {
// TODO: some caching
mds *MultiStore
}
func (m *multiReadBs) Has(cid cid.Cid) (bool, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
return true, nil
}
return false, merr
}
func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) {
m.mds.lk.RLock()
defer m.mds.lk.RUnlock()
var merr error
for i, store := range m.mds.open {
has, err := store.Bstore.Has(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("has (ds %d): %w", i, err))
continue
}
if !has {
continue
}
val, err := store.Bstore.Get(cid)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("get (ds %d): %w", i, err))
continue
}
return val, nil
}
if merr == nil {
return nil, blockstore.ErrNotFound
}
return nil, merr
}
func (m *multiReadBs) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) GetSize(cid cid.Cid) (int, error) {
return 0, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) Put(block blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("operation not supported")
}
func (m *multiReadBs) HashOnRead(enabled bool) {
return
}
var _ blockstore.Blockstore = &multiReadBs{}

View File

@ -9,10 +9,12 @@ import (
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/filecoin-project/go-multistore"
) )
type Mgr struct { type Mgr struct {
mds *MultiStore mds *multistore.MultiStore
ds datastore.Batching ds datastore.Batching
Blockstore blockstore.Blockstore Blockstore blockstore.Blockstore
@ -27,12 +29,10 @@ const (
LMTime = "mtime" // File modification timestamp LMTime = "mtime" // File modification timestamp
) )
func New(mds *MultiStore, ds datastore.Batching) *Mgr { func New(mds *multistore.MultiStore, ds datastore.Batching) *Mgr {
return &Mgr{ return &Mgr{
mds: mds, mds: mds,
Blockstore: &multiReadBs{ Blockstore: mds.MultiReadBlockstore(),
mds: mds,
},
ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"), ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
} }
@ -42,7 +42,7 @@ type StoreMeta struct {
Labels map[string]string Labels map[string]string
} }
func (m *Mgr) NewStore() (int, *Store, error) { func (m *Mgr) NewStore() (int, *multistore.Store, error) {
id := m.mds.Next() id := m.mds.Next()
st, err := m.mds.Get(id) st, err := m.mds.Get(id)
if err != nil { if err != nil {

View File

@ -1,188 +0,0 @@
package importmgr
import (
"encoding/json"
"fmt"
"sort"
"sync"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)
type MultiStore struct {
ds datastore.Batching
open map[int]*Store
next int
lk sync.RWMutex
}
var dsListKey = datastore.NewKey("/list")
var dsMultiKey = datastore.NewKey("/multi")
func NewMultiDstore(ds datastore.Batching) (*MultiStore, error) {
listBytes, err := ds.Get(dsListKey)
if xerrors.Is(err, datastore.ErrNotFound) {
listBytes, _ = json.Marshal([]int{})
} else if err != nil {
return nil, xerrors.Errorf("could not read multistore list: %w", err)
}
var ids []int
if err := json.Unmarshal(listBytes, &ids); err != nil {
return nil, xerrors.Errorf("could not unmarshal multistore list: %w", err)
}
mds := &MultiStore{
ds: ds,
open: map[int]*Store{},
}
for _, i := range ids {
if i > mds.next {
mds.next = i
}
_, err := mds.Get(i)
if err != nil {
return nil, xerrors.Errorf("open store %d: %w", i, err)
}
}
return mds, nil
}
func (mds *MultiStore) Next() int {
mds.lk.Lock()
defer mds.lk.Unlock()
mds.next++
return mds.next
}
func (mds *MultiStore) updateStores() error {
stores := make([]int, 0, len(mds.open))
for k := range mds.open {
stores = append(stores, k)
}
sort.Ints(stores)
listBytes, err := json.Marshal(stores)
if err != nil {
return xerrors.Errorf("could not marshal list: %w", err)
}
err = mds.ds.Put(dsListKey, listBytes)
if err != nil {
return xerrors.Errorf("could not save stores list: %w", err)
}
return nil
}
func (mds *MultiStore) Get(i int) (*Store, error) {
mds.lk.Lock()
defer mds.lk.Unlock()
store, ok := mds.open[i]
if ok {
return store, nil
}
wds := ktds.Wrap(mds.ds, ktds.PrefixTransform{
Prefix: dsMultiKey.ChildString(fmt.Sprintf("%d", i)),
})
var err error
mds.open[i], err = openStore(wds)
if err != nil {
return nil, xerrors.Errorf("could not open new store: %w", err)
}
err = mds.updateStores()
if err != nil {
return nil, xerrors.Errorf("updating stores: %w", err)
}
return mds.open[i], nil
}
func (mds *MultiStore) List() []int {
mds.lk.RLock()
defer mds.lk.RUnlock()
out := make([]int, 0, len(mds.open))
for i := range mds.open {
out = append(out, i)
}
sort.Ints(out)
return out
}
func (mds *MultiStore) Delete(i int) error {
mds.lk.Lock()
defer mds.lk.Unlock()
store, ok := mds.open[i]
if !ok {
return nil
}
delete(mds.open, i)
err := store.Close()
if err != nil {
return xerrors.Errorf("closing store: %w", err)
}
err = mds.updateStores()
if err != nil {
return xerrors.Errorf("updating stores: %w", err)
}
qres, err := store.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("query error: %w", err)
}
defer qres.Close() //nolint:errcheck
b, err := store.ds.Batch()
if err != nil {
return xerrors.Errorf("batch error: %w", err)
}
for r := range qres.Next() {
if r.Error != nil {
_ = b.Commit()
return xerrors.Errorf("iterator error: %w", err)
}
err := b.Delete(datastore.NewKey(r.Key))
if err != nil {
_ = b.Commit()
return xerrors.Errorf("adding to batch: %w", err)
}
}
err = b.Commit()
if err != nil {
return xerrors.Errorf("committing: %w", err)
}
return nil
}
func (mds *MultiStore) Close() error {
mds.lk.Lock()
defer mds.lk.Unlock()
var err error
for _, s := range mds.open {
err = multierr.Append(err, s.Close())
}
mds.open = make(map[int]*Store)
return err
}

View File

@ -1,54 +0,0 @@
package importmgr
import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
)
type Store struct {
ds datastore.Batching
fm *filestore.FileManager
Fstore *filestore.Filestore
Bstore blockstore.Blockstore
bsvc blockservice.BlockService
DAG ipld.DAGService
}
func openStore(ds datastore.Batching) (*Store, error) {
blocks := namespace.Wrap(ds, datastore.NewKey("blocks"))
bs := blockstore.NewBlockstore(blocks)
fm := filestore.NewFileManager(ds, "/")
fm.AllowFiles = true
fstore := filestore.NewFilestore(bs, fm)
ibs := blockstore.NewIdStore(fstore)
bsvc := blockservice.New(ibs, offline.Exchange(ibs))
dag := merkledag.NewDAGService(bsvc)
return &Store{
ds: ds,
fm: fm,
Fstore: fstore,
Bstore: ibs,
bsvc: bsvc,
DAG: dag,
}, nil
}
func (s *Store) Close() error {
return s.bsvc.Close()
}

View File

@ -64,7 +64,7 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks {
return sbc return sbc
} }
func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset uint64, size abi.UnpaddedPieceSize) error { func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset abi.PaddedPieceSize, size abi.UnpaddedPieceSize) error {
st.keyLk.Lock() // TODO: make this multithreaded st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock() defer st.keyLk.Unlock()
@ -102,7 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize
return 0, err return 0, err
} }
err = st.writeRef(d.DealID, sn, offset, size) err = st.writeRef(d.DealID, sn, abi.PaddedPieceSize(offset), size)
if err != nil { if err != nil {
return 0, xerrors.Errorf("writeRef: %w", err) return 0, xerrors.Errorf("writeRef: %w", err)
} }