Merge branch 'next' into feat/windows
This commit is contained in:
commit
16ab71d302
@ -43,9 +43,11 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof
|
||||
return nil
|
||||
}
|
||||
|
||||
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
|
||||
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, stores.PathStorage, stores.AcquireMove)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire sector in checkProvable: %w", err)
|
||||
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
|
||||
bad = append(bad, sector)
|
||||
return nil
|
||||
}
|
||||
|
||||
if lp.Sealed == "" || lp.Cache == "" {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/sector-storage/storiface"
|
||||
)
|
||||
|
||||
@ -218,6 +219,28 @@ func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
|
||||
have, err := pf.allocated.RunIterator()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := fsutil.Deallocate(pf.file, int64(offset), int64(size)); err != nil {
|
||||
return xerrors.Errorf("deallocating: %w", err)
|
||||
}
|
||||
|
||||
s, err := rlepluslazy.Subtract(have, pieceRun(offset, size))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := writeTrailer(int64(pf.maxPiece), pf.file, s); err != nil {
|
||||
return xerrors.Errorf("writing trailer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
|
||||
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
|
||||
return nil, xerrors.Errorf("seek piece start: %w", err)
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
||||
commcid "github.com/filecoin-project/go-fil-commcid"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
@ -81,7 +82,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
||||
|
||||
var stagedPath stores.SectorPaths
|
||||
if len(existingPieceSizes) == 0 {
|
||||
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, true)
|
||||
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, stores.PathSealing)
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
|
||||
}
|
||||
@ -91,7 +92,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
||||
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
|
||||
}
|
||||
} else {
|
||||
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true)
|
||||
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathSealing)
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
|
||||
}
|
||||
@ -198,12 +199,12 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
||||
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
|
||||
|
||||
// try finding existing
|
||||
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
|
||||
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||
var pf *partialFile
|
||||
|
||||
switch {
|
||||
case xerrors.Is(err, storiface.ErrSectorNotFound):
|
||||
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, false)
|
||||
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, stores.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
|
||||
}
|
||||
@ -240,7 +241,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
||||
return nil
|
||||
}
|
||||
|
||||
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, false)
|
||||
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, stores.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire sealed sector paths: %w", err)
|
||||
}
|
||||
@ -253,7 +254,10 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
||||
defer sealed.Close()
|
||||
|
||||
var at, nextat abi.PaddedPieceSize
|
||||
for {
|
||||
first := true
|
||||
for first || toUnseal.HasNext() {
|
||||
first = false
|
||||
|
||||
piece, err := toUnseal.NextRun()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting next range to unseal: %w", err)
|
||||
@ -358,7 +362,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
|
||||
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire unsealed sector path: %w", err)
|
||||
}
|
||||
@ -395,7 +399,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
|
||||
}
|
||||
|
||||
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, true)
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, stores.PathSealing)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
@ -452,7 +456,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
|
||||
}
|
||||
|
||||
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, stores.PathSealing)
|
||||
if err != nil {
|
||||
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
@ -470,7 +474,7 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
||||
}
|
||||
|
||||
func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, stores.PathSealing)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("acquire sector paths: %w", err)
|
||||
}
|
||||
@ -502,10 +506,62 @@ func (sb *Sealer) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
|
||||
|
||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
|
||||
if len(keepUnsealed) > 0 {
|
||||
return xerrors.Errorf("keepUnsealed unsupported") // TODO: impl for fastretrieval copies
|
||||
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
|
||||
|
||||
sr := pieceRun(0, maxPieceSize)
|
||||
|
||||
for _, s := range keepUnsealed {
|
||||
si := &rlepluslazy.RunSliceIterator{}
|
||||
if s.Offset != 0 {
|
||||
si.Runs = append(si.Runs, rlepluslazy.Run{Val: false, Len: uint64(s.Offset)})
|
||||
}
|
||||
si.Runs = append(si.Runs, rlepluslazy.Run{Val: true, Len: uint64(s.Size)})
|
||||
|
||||
var err error
|
||||
sr, err = rlepluslazy.Subtract(sr, si)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
pf, err := openPartialFile(maxPieceSize, paths.Unsealed)
|
||||
if xerrors.Is(err, os.ErrNotExist) {
|
||||
return xerrors.Errorf("opening partial file: %w", err)
|
||||
}
|
||||
|
||||
var at uint64
|
||||
for sr.HasNext() {
|
||||
r, err := sr.NextRun()
|
||||
if err != nil {
|
||||
_ = pf.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
offset := at
|
||||
at += r.Len
|
||||
if !r.Val {
|
||||
continue
|
||||
}
|
||||
|
||||
err = pf.Free(storiface.PaddedByteIndex(abi.UnpaddedPieceSize(offset).Padded()), abi.UnpaddedPieceSize(r.Len).Padded())
|
||||
if err != nil {
|
||||
_ = pf.Close()
|
||||
return xerrors.Errorf("free partial file range: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := pf.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, false)
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, stores.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
||||
}
|
||||
@ -542,33 +598,65 @@ func GeneratePieceCIDFromFile(proofType abi.RegisteredSealProof, piece io.Reader
|
||||
return pieceCID, werr()
|
||||
}
|
||||
|
||||
func GenerateUnsealedCID(proofType abi.RegisteredSealProof, pieces []abi.PieceInfo) (cid.Cid, error) {
|
||||
func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
|
||||
|
||||
padPieces := make([]abi.PaddedPieceSize, 0)
|
||||
|
||||
toFill := uint64(-oldLength % newPieceLength)
|
||||
|
||||
n := bits.OnesCount64(toFill)
|
||||
var sum abi.PaddedPieceSize
|
||||
for _, p := range pieces {
|
||||
sum += p.Size
|
||||
for i := 0; i < n; i++ {
|
||||
next := bits.TrailingZeros64(toFill)
|
||||
psize := uint64(1) << uint(next)
|
||||
toFill ^= psize
|
||||
|
||||
padded := abi.PaddedPieceSize(psize)
|
||||
padPieces = append(padPieces, padded)
|
||||
sum += padded
|
||||
}
|
||||
|
||||
return padPieces, sum
|
||||
}
|
||||
|
||||
func GenerateUnsealedCID(proofType abi.RegisteredSealProof, pieces []abi.PieceInfo) (cid.Cid, error) {
|
||||
ssize, err := proofType.SectorSize()
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
{
|
||||
// pad remaining space with 0 CommPs
|
||||
toFill := uint64(abi.PaddedPieceSize(ssize) - sum)
|
||||
n := bits.OnesCount64(toFill)
|
||||
for i := 0; i < n; i++ {
|
||||
next := bits.TrailingZeros64(toFill)
|
||||
psize := uint64(1) << uint(next)
|
||||
toFill ^= psize
|
||||
pssize := abi.PaddedPieceSize(ssize)
|
||||
allPieces := make([]abi.PieceInfo, 0, len(pieces))
|
||||
if len(pieces) == 0 {
|
||||
allPieces = append(allPieces, abi.PieceInfo{
|
||||
Size: pssize,
|
||||
PieceCID: zerocomm.ZeroPieceCommitment(pssize.Unpadded()),
|
||||
})
|
||||
} else {
|
||||
var sum abi.PaddedPieceSize
|
||||
|
||||
unpadded := abi.PaddedPieceSize(psize).Unpadded()
|
||||
pieces = append(pieces, abi.PieceInfo{
|
||||
Size: unpadded.Padded(),
|
||||
PieceCID: zerocomm.ZeroPieceCommitment(unpadded),
|
||||
})
|
||||
padTo := func(pads []abi.PaddedPieceSize) {
|
||||
for _, p := range pads {
|
||||
allPieces = append(allPieces, abi.PieceInfo{
|
||||
Size: p,
|
||||
PieceCID: zerocomm.ZeroPieceCommitment(p.Unpadded()),
|
||||
})
|
||||
|
||||
sum += p
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range pieces {
|
||||
ps, _ := GetRequiredPadding(sum, p.Size)
|
||||
padTo(ps)
|
||||
|
||||
allPieces = append(allPieces, p)
|
||||
sum += p.Size
|
||||
}
|
||||
|
||||
ps, _ := GetRequiredPadding(sum, pssize)
|
||||
padTo(ps)
|
||||
}
|
||||
|
||||
return ffi.GenerateUnsealedCID(proofType, pieces)
|
||||
return ffi.GenerateUnsealedCID(proofType, allPieces)
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/ipfs/go-cid"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
@ -120,7 +121,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
|
||||
t.Fatal("read wrong bytes")
|
||||
}
|
||||
|
||||
p, sd, err := sp.AcquireSector(context.TODO(), si, stores.FTUnsealed, stores.FTNone, false)
|
||||
p, sd, err := sp.AcquireSector(context.TODO(), si, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -488,3 +489,105 @@ func requireFDsClosed(t *testing.T, start int) {
|
||||
log.Infow("open FDs", "start", start, "now", openNow)
|
||||
require.Equal(t, start, openNow, "FDs shouldn't leak")
|
||||
}
|
||||
|
||||
func TestGenerateUnsealedCID(t *testing.T) {
|
||||
pt := abi.RegisteredSealProof_StackedDrg2KiBV1
|
||||
ups := int(abi.PaddedPieceSize(2048).Unpadded())
|
||||
|
||||
commP := func(b []byte) cid.Cid {
|
||||
pf, werr, err := ToReadableFile(bytes.NewReader(b), int64(len(b)))
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err := ffi.GeneratePieceCIDFromFile(pt, pf, abi.UnpaddedPieceSize(len(b)))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, werr())
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
testCommEq := func(name string, in [][]byte, expect [][]byte) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
upi := make([]abi.PieceInfo, len(in))
|
||||
for i, b := range in {
|
||||
upi[i] = abi.PieceInfo{
|
||||
Size: abi.UnpaddedPieceSize(len(b)).Padded(),
|
||||
PieceCID: commP(b),
|
||||
}
|
||||
}
|
||||
|
||||
sectorPi := []abi.PieceInfo{
|
||||
{
|
||||
Size: 2048,
|
||||
PieceCID: commP(bytes.Join(expect, nil)),
|
||||
},
|
||||
}
|
||||
|
||||
expectCid, err := GenerateUnsealedCID(pt, sectorPi)
|
||||
require.NoError(t, err)
|
||||
|
||||
actualCid, err := GenerateUnsealedCID(pt, upi)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expectCid, actualCid)
|
||||
})
|
||||
}
|
||||
|
||||
barr := func(b byte, den int) []byte {
|
||||
return bytes.Repeat([]byte{b}, ups/den)
|
||||
}
|
||||
|
||||
// 0000
|
||||
testCommEq("zero",
|
||||
nil,
|
||||
[][]byte{barr(0, 1)},
|
||||
)
|
||||
|
||||
// 1111
|
||||
testCommEq("one",
|
||||
[][]byte{barr(1, 1)},
|
||||
[][]byte{barr(1, 1)},
|
||||
)
|
||||
|
||||
// 11 00
|
||||
testCommEq("one|2",
|
||||
[][]byte{barr(1, 2)},
|
||||
[][]byte{barr(1, 2), barr(0, 2)},
|
||||
)
|
||||
|
||||
// 1 0 00
|
||||
testCommEq("one|4",
|
||||
[][]byte{barr(1, 4)},
|
||||
[][]byte{barr(1, 4), barr(0, 4), barr(0, 2)},
|
||||
)
|
||||
|
||||
// 11 2 0
|
||||
testCommEq("one|2-two|4",
|
||||
[][]byte{barr(1, 2), barr(2, 4)},
|
||||
[][]byte{barr(1, 2), barr(2, 4), barr(0, 4)},
|
||||
)
|
||||
|
||||
// 1 0 22
|
||||
testCommEq("one|4-two|2",
|
||||
[][]byte{barr(1, 4), barr(2, 2)},
|
||||
[][]byte{barr(1, 4), barr(0, 4), barr(2, 2)},
|
||||
)
|
||||
|
||||
// 1 0 22 0000
|
||||
testCommEq("one|8-two|4",
|
||||
[][]byte{barr(1, 8), barr(2, 4)},
|
||||
[][]byte{barr(1, 8), barr(0, 8), barr(2, 4), barr(0, 2)},
|
||||
)
|
||||
|
||||
// 11 2 0 0000
|
||||
testCommEq("one|4-two|8",
|
||||
[][]byte{barr(1, 4), barr(2, 8)},
|
||||
[][]byte{barr(1, 4), barr(2, 8), barr(0, 8), barr(0, 2)},
|
||||
)
|
||||
|
||||
// 1 0 22 3 0 00 4444 5 0 00
|
||||
testCommEq("one|16-two|8-three|16-four|4-five|16",
|
||||
[][]byte{barr(1, 16), barr(2, 8), barr(3, 16), barr(4, 4), barr(5, 16)},
|
||||
[][]byte{barr(1, 16), barr(0, 16), barr(2, 8), barr(3, 16), barr(0, 16), barr(0, 8), barr(4, 4), barr(5, 16), barr(0, 16), barr(0, 8)},
|
||||
)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
func (sb *Sealer) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []abi.SectorInfo, randomness abi.PoStRandomness) ([]abi.PoStProof, error) {
|
||||
randomness[31] = 0 // TODO: Not correct, fixme
|
||||
randomness[31] &= 0x3f
|
||||
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWinningPoStProof) // TODO: FAULTS?
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -29,7 +29,7 @@ func (sb *Sealer) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID,
|
||||
}
|
||||
|
||||
func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []abi.SectorInfo, randomness abi.PoStRandomness) ([]abi.PoStProof, []abi.SectorID, error) {
|
||||
randomness[31] = 0 // TODO: Not correct, fixme
|
||||
randomness[31] &= 0x3f
|
||||
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("gathering sector info: %w", err)
|
||||
@ -62,7 +62,7 @@ func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorIn
|
||||
|
||||
sid := abi.SectorID{Miner: mid, Number: s.SectorNumber}
|
||||
|
||||
paths, d, err := sb.sectors.AcquireSector(ctx, sid, stores.FTCache|stores.FTSealed, 0, false)
|
||||
paths, d, err := sb.sectors.AcquireSector(ctx, sid, stores.FTCache|stores.FTSealed, 0, stores.PathStorage)
|
||||
if err != nil {
|
||||
log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err)
|
||||
skipped = append(skipped, sid)
|
||||
@ -98,7 +98,7 @@ func (proofVerifier) VerifySeal(info abi.SealVerifyInfo) (bool, error) {
|
||||
}
|
||||
|
||||
func (proofVerifier) VerifyWinningPoSt(ctx context.Context, info abi.WinningPoStVerifyInfo) (bool, error) {
|
||||
info.Randomness[31] = 0 // TODO: Not correct, fixme
|
||||
info.Randomness[31] &= 0x3f
|
||||
_, span := trace.StartSpan(ctx, "VerifyWinningPoSt")
|
||||
defer span.End()
|
||||
|
||||
@ -106,7 +106,7 @@ func (proofVerifier) VerifyWinningPoSt(ctx context.Context, info abi.WinningPoSt
|
||||
}
|
||||
|
||||
func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info abi.WindowPoStVerifyInfo) (bool, error) {
|
||||
info.Randomness[31] = 0 // TODO: Not correct, fixme
|
||||
info.Randomness[31] &= 0x3f
|
||||
_, span := trace.StartSpan(ctx, "VerifyWindowPoSt")
|
||||
defer span.End()
|
||||
|
||||
@ -114,6 +114,6 @@ func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info abi.WindowPoStVe
|
||||
}
|
||||
|
||||
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
|
||||
randomness[31] = 0 // TODO: Not correct, fixme
|
||||
randomness[31] &= 0x3f
|
||||
return ffi.GenerateWinningPoStSectorChallenge(proofType, minerID, randomness, eligibleSectorCount)
|
||||
}
|
||||
|
28
fsutil/dealloc_linux.go
Normal file
28
fsutil/dealloc_linux.go
Normal file
@ -0,0 +1,28 @@
|
||||
package fsutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
)
|
||||
|
||||
var log = logging.Logger("fsutil")
|
||||
|
||||
const FallocFlPunchHole = 0x02 // linux/falloc.h
|
||||
|
||||
func Deallocate(file *os.File, offset int64, length int64) error {
|
||||
if length == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := syscall.Fallocate(int(file.Fd()), FallocFlPunchHole, offset, length)
|
||||
if errno, ok := err.(syscall.Errno); ok {
|
||||
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
|
||||
log.Warnf("could not deallocate space, ignoring: %v", errno)
|
||||
err = nil // log and ignore
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
17
fsutil/dealloc_other.go
Normal file
17
fsutil/dealloc_other.go
Normal file
@ -0,0 +1,17 @@
|
||||
// +build !linux
|
||||
|
||||
package fsutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
)
|
||||
|
||||
var log = logging.Logger("fsutil")
|
||||
|
||||
func Deallocate(file *os.File, offset int64, length int64) error {
|
||||
log.Warnf("deallocating space not supported")
|
||||
|
||||
return nil
|
||||
}
|
25
fsutil/filesize_unix.go
Normal file
25
fsutil/filesize_unix.go
Normal file
@ -0,0 +1,25 @@
|
||||
package fsutil
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type SizeInfo struct {
|
||||
OnDisk int64
|
||||
}
|
||||
|
||||
// FileSize returns bytes used by a file on disk
|
||||
func FileSize(path string) (SizeInfo, error) {
|
||||
var stat syscall.Stat_t
|
||||
if err := syscall.Stat(path, &stat); err != nil {
|
||||
return SizeInfo{}, xerrors.Errorf("stat: %w", err)
|
||||
}
|
||||
|
||||
// NOTE: stat.Blocks is in 512B blocks, NOT in stat.Blksize
|
||||
// See https://www.gnu.org/software/libc/manual/html_node/Attribute-Meanings.html
|
||||
return SizeInfo{
|
||||
int64(stat.Blocks) * 512,
|
||||
}, nil
|
||||
}
|
7
fsutil/statfs.go
Normal file
7
fsutil/statfs.go
Normal file
@ -0,0 +1,7 @@
|
||||
package fsutil
|
||||
|
||||
type FsStat struct {
|
||||
Capacity int64
|
||||
Available int64 // Available to use for sector storage
|
||||
Reserved int64
|
||||
}
|
19
fsutil/statfs_unix.go
Normal file
19
fsutil/statfs_unix.go
Normal file
@ -0,0 +1,19 @@
|
||||
package fsutil
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
func Statfs(path string) (FsStat, error) {
|
||||
var stat syscall.Statfs_t
|
||||
if err := syscall.Statfs(path, &stat); err != nil {
|
||||
return FsStat{}, xerrors.Errorf("statfs: %w", err)
|
||||
}
|
||||
|
||||
return FsStat{
|
||||
Capacity: int64(stat.Blocks) * int64(stat.Bsize),
|
||||
Available: int64(stat.Bavail) * int64(stat.Bsize),
|
||||
}, nil
|
||||
}
|
28
fsutil/statfs_windows.go
Normal file
28
fsutil/statfs_windows.go
Normal file
@ -0,0 +1,28 @@
|
||||
package fsutil
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
func Statfs(volumePath string) (FsStat, error) {
|
||||
// From https://github.com/ricochet2200/go-disk-usage/blob/master/du/diskusage_windows.go
|
||||
|
||||
h := syscall.MustLoadDLL("kernel32.dll")
|
||||
c := h.MustFindProc("GetDiskFreeSpaceExW")
|
||||
|
||||
var freeBytes int64
|
||||
var totalBytes int64
|
||||
var availBytes int64
|
||||
|
||||
c.Call(
|
||||
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(volumePath))),
|
||||
uintptr(unsafe.Pointer(&freeBytes)),
|
||||
uintptr(unsafe.Pointer(&totalBytes)),
|
||||
uintptr(unsafe.Pointer(&availBytes)))
|
||||
|
||||
return FsStat{
|
||||
Capacity: totalBytes,
|
||||
Available: availBytes,
|
||||
}, nil
|
||||
}
|
2
go.mod
2
go.mod
@ -6,7 +6,7 @@ require (
|
||||
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
|
||||
github.com/elastic/go-sysinfo v1.3.0
|
||||
github.com/filecoin-project/filecoin-ffi v0.0.0-20200326153646-e899cc1dd072
|
||||
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e
|
||||
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1
|
||||
github.com/filecoin-project/specs-actors v0.6.1
|
||||
|
4
go.sum
4
go.sum
@ -34,8 +34,8 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2/
|
||||
github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw=
|
||||
github.com/filecoin-project/go-bitfield v0.0.1 h1:Xg/JnrqqE77aJVKdbEyR04n9FZQWhwrN+buDgQCVpZU=
|
||||
github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
|
||||
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60=
|
||||
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
|
||||
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1 h1:xuHlrdznafh7ul5t4xEncnA4qgpQvJZEw+mr98eqHXw=
|
||||
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo=
|
||||
|
@ -61,14 +61,22 @@ type localWorkerPathProvider struct {
|
||||
}
|
||||
|
||||
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {
|
||||
|
||||
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
|
||||
if err != nil {
|
||||
return stores.SectorPaths{}, nil, err
|
||||
}
|
||||
|
||||
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, stores.FSOverheadSeal)
|
||||
if err != nil {
|
||||
return stores.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
|
||||
|
||||
return paths, func() {
|
||||
releaseStorage()
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&allocate == 0 {
|
||||
continue
|
||||
@ -171,8 +179,10 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k
|
||||
return xerrors.Errorf("finalizing sector: %w", err)
|
||||
}
|
||||
|
||||
if err := l.storage.Remove(ctx, sector, stores.FTUnsealed, true); err != nil {
|
||||
return xerrors.Errorf("removing unsealed data: %w", err)
|
||||
if len(keepUnsealed) == 0 {
|
||||
if err := l.storage.Remove(ctx, sector, stores.FTUnsealed, true); err != nil {
|
||||
return xerrors.Errorf("removing unsealed data: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
10
manager.go
10
manager.go
@ -3,6 +3,7 @@ package sectorstorage
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
@ -218,12 +219,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
||||
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
|
||||
|
||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, true, stores.AcquireCopy); err != nil {
|
||||
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
|
||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||
}
|
||||
|
||||
if len(best) > 0 {
|
||||
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, true, stores.AcquireMove); err != nil {
|
||||
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil {
|
||||
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
||||
}
|
||||
}
|
||||
@ -441,7 +442,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
|
||||
return xerrors.Errorf("implement me")
|
||||
log.Warnw("ReleaseUnsealed todo")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
@ -490,7 +492,7 @@ func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
|
||||
func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
|
||||
return m.storage.FsStat(ctx, id)
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"io/ioutil"
|
||||
@ -28,6 +29,10 @@ func init() {
|
||||
|
||||
type testStorage stores.StorageConfig
|
||||
|
||||
func (t testStorage) DiskUsage(path string) (int64, error) {
|
||||
return 1, nil // close enough
|
||||
}
|
||||
|
||||
func newTestStorage(t *testing.T) *testStorage {
|
||||
tp, err := ioutil.TempDir(os.TempDir(), "sector-storage-test-")
|
||||
require.NoError(t, err)
|
||||
@ -69,8 +74,8 @@ func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testStorage) Stat(path string) (stores.FsStat, error) {
|
||||
return stores.Stat(path)
|
||||
func (t *testStorage) Stat(path string) (fsutil.FsStat, error) {
|
||||
return fsutil.Statfs(path)
|
||||
}
|
||||
|
||||
var _ stores.LocalStorage = &testStorage{}
|
||||
|
@ -320,7 +320,7 @@ func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID, []storage.Ra
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
|
||||
panic("implement me")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
|
@ -19,15 +19,17 @@ const (
|
||||
FTNone SectorFileType = 0
|
||||
)
|
||||
|
||||
const FSOverheadDen = 10
|
||||
|
||||
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
|
||||
FTUnsealed: 10,
|
||||
FTSealed: 10,
|
||||
FTCache: 70, // TODO: confirm for 32G
|
||||
FTUnsealed: FSOverheadDen,
|
||||
FTSealed: FSOverheadDen,
|
||||
FTCache: 141, // 11 layers + D(2x ssize) + C + R
|
||||
}
|
||||
|
||||
var FsOverheadFinalized = map[SectorFileType]int{
|
||||
FTUnsealed: 10,
|
||||
FTSealed: 10,
|
||||
FTUnsealed: FSOverheadDen,
|
||||
FTSealed: FSOverheadDen,
|
||||
FTCache: 2,
|
||||
}
|
||||
|
||||
@ -67,7 +69,7 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredSealProof) (uint64, error
|
||||
return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
|
||||
}
|
||||
|
||||
need += uint64(oh) * uint64(ssize) / 10
|
||||
need += uint64(oh) * uint64(ssize) / FSOverheadDen
|
||||
}
|
||||
|
||||
return need, nil
|
||||
|
@ -72,13 +72,15 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
||||
// The caller has a lock on this sector already, no need to get one here
|
||||
|
||||
// passing 0 spt because we don't allocate anything
|
||||
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
|
||||
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, PathStorage, AcquireMove)
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: reserve local storage here
|
||||
|
||||
path := PathByType(paths, ft)
|
||||
if path == "" {
|
||||
log.Error("acquired path was empty")
|
||||
|
@ -2,6 +2,7 @@ package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"net/url"
|
||||
gopath "path"
|
||||
"sort"
|
||||
@ -34,7 +35,7 @@ type StorageInfo struct {
|
||||
}
|
||||
|
||||
type HealthReport struct {
|
||||
Stat FsStat
|
||||
Stat fsutil.FsStat
|
||||
Err error
|
||||
}
|
||||
|
||||
@ -50,7 +51,7 @@ type SectorStorageInfo struct {
|
||||
}
|
||||
|
||||
type SectorIndex interface { // part of storage-miner api
|
||||
StorageAttach(context.Context, StorageInfo, FsStat) error
|
||||
StorageAttach(context.Context, StorageInfo, fsutil.FsStat) error
|
||||
StorageInfo(context.Context, ID) (StorageInfo, error)
|
||||
StorageReportHealth(context.Context, ID, HealthReport) error
|
||||
|
||||
@ -77,7 +78,7 @@ type declMeta struct {
|
||||
|
||||
type storageEntry struct {
|
||||
info *StorageInfo
|
||||
fsi FsStat
|
||||
fsi fsutil.FsStat
|
||||
|
||||
lastHeartbeat time.Time
|
||||
heartbeatErr error
|
||||
@ -130,7 +131,7 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) error {
|
||||
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsStat) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
||||
@ -361,7 +362,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
|
||||
continue
|
||||
}
|
||||
|
||||
if spaceReq > p.fsi.Available {
|
||||
if spaceReq > uint64(p.fsi.Available) {
|
||||
log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq)
|
||||
continue
|
||||
}
|
||||
|
@ -2,18 +2,15 @@ package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
type PathType bool
|
||||
type PathType string
|
||||
|
||||
const (
|
||||
PathStorage = false
|
||||
PathSealing = true
|
||||
PathStorage = "storage"
|
||||
PathSealing = "sealing"
|
||||
)
|
||||
|
||||
type AcquireMode string
|
||||
@ -34,23 +31,5 @@ type Store interface {
|
||||
// move sectors into storage
|
||||
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error
|
||||
|
||||
FsStat(ctx context.Context, id ID) (FsStat, error)
|
||||
}
|
||||
|
||||
func Stat(path string) (FsStat, error) {
|
||||
var stat syscall.Statfs_t
|
||||
if err := syscall.Statfs(path, &stat); err != nil {
|
||||
return FsStat{}, xerrors.Errorf("statfs: %w", err)
|
||||
}
|
||||
|
||||
return FsStat{
|
||||
Capacity: stat.Blocks * uint64(stat.Bsize),
|
||||
Available: stat.Bavail * uint64(stat.Bsize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type FsStat struct {
|
||||
Capacity uint64
|
||||
Available uint64 // Available to use for sector storage
|
||||
Used uint64
|
||||
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
|
||||
}
|
||||
|
126
stores/local.go
126
stores/local.go
@ -3,6 +3,7 @@ package stores
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"io/ioutil"
|
||||
"math/bits"
|
||||
"math/rand"
|
||||
@ -48,7 +49,8 @@ type LocalStorage interface {
|
||||
GetStorage() (StorageConfig, error)
|
||||
SetStorage(func(*StorageConfig)) error
|
||||
|
||||
Stat(path string) (FsStat, error)
|
||||
Stat(path string) (fsutil.FsStat, error)
|
||||
DiskUsage(path string) (int64, error) // returns real disk usage for a file/directory
|
||||
}
|
||||
|
||||
const MetaFile = "sectorstore.json"
|
||||
@ -67,6 +69,50 @@ type Local struct {
|
||||
|
||||
type path struct {
|
||||
local string // absolute local path
|
||||
|
||||
reserved int64
|
||||
reservations map[abi.SectorID]SectorFileType
|
||||
}
|
||||
|
||||
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
|
||||
stat, err := ls.Stat(p.local)
|
||||
if err != nil {
|
||||
return fsutil.FsStat{}, err
|
||||
}
|
||||
|
||||
stat.Reserved = p.reserved
|
||||
|
||||
for id, ft := range p.reservations {
|
||||
for _, fileType := range PathTypes {
|
||||
if fileType&ft == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
used, err := ls.DiskUsage(p.sectorPath(id, fileType))
|
||||
if err != nil {
|
||||
log.Errorf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err)
|
||||
continue
|
||||
}
|
||||
|
||||
stat.Reserved -= used
|
||||
}
|
||||
}
|
||||
|
||||
if stat.Reserved < 0 {
|
||||
log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
|
||||
stat.Reserved = 0
|
||||
}
|
||||
|
||||
stat.Available -= stat.Reserved
|
||||
if stat.Available < 0 {
|
||||
stat.Available = 0
|
||||
}
|
||||
|
||||
return stat, err
|
||||
}
|
||||
|
||||
func (p *path) sectorPath(sid abi.SectorID, fileType SectorFileType) string {
|
||||
return filepath.Join(p.local, fileType.String(), SectorName(sid))
|
||||
}
|
||||
|
||||
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
|
||||
@ -98,9 +144,12 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
|
||||
out := &path{
|
||||
local: p,
|
||||
|
||||
reserved: 0,
|
||||
reservations: map[abi.SectorID]SectorFileType{},
|
||||
}
|
||||
|
||||
fst, err := st.localStorage.Stat(p)
|
||||
fst, err := out.stat(st.localStorage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -179,7 +228,7 @@ func (st *Local) reportHealth(ctx context.Context) {
|
||||
|
||||
toReport := map[ID]HealthReport{}
|
||||
for id, p := range st.paths {
|
||||
stat, err := st.localStorage.Stat(p.local)
|
||||
stat, err := p.stat(st.localStorage)
|
||||
|
||||
toReport[id] = HealthReport{
|
||||
Stat: stat,
|
||||
@ -197,6 +246,61 @@ func (st *Local) reportHealth(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) {
|
||||
ssize, err := spt.SectorSize()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting sector size: %w", err)
|
||||
}
|
||||
|
||||
st.localLk.Lock()
|
||||
|
||||
done := func() {}
|
||||
deferredDone := func() { done() }
|
||||
defer func() {
|
||||
st.localLk.Unlock()
|
||||
deferredDone()
|
||||
}()
|
||||
|
||||
for _, fileType := range PathTypes {
|
||||
if fileType&ft == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
id := ID(PathByType(storageIDs, fileType))
|
||||
|
||||
p, ok := st.paths[id]
|
||||
if !ok {
|
||||
return nil, errPathNotFound
|
||||
}
|
||||
|
||||
stat, err := p.stat(st.localStorage)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen
|
||||
|
||||
if stat.Available < overhead {
|
||||
return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)
|
||||
}
|
||||
|
||||
p.reserved += overhead
|
||||
|
||||
prevDone := done
|
||||
done = func() {
|
||||
prevDone()
|
||||
|
||||
st.localLk.Lock()
|
||||
defer st.localLk.Unlock()
|
||||
|
||||
p.reserved -= overhead
|
||||
}
|
||||
}
|
||||
|
||||
deferredDone = func() {}
|
||||
return done, nil
|
||||
}
|
||||
|
||||
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
|
||||
if existing|allocate != existing^allocate {
|
||||
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
||||
@ -229,7 +333,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
|
||||
continue
|
||||
}
|
||||
|
||||
spath := filepath.Join(p.local, fileType.String(), SectorName(sid))
|
||||
spath := p.sectorPath(sid, fileType)
|
||||
SetPathByType(&out, fileType, spath)
|
||||
SetPathByType(&storageIDs, fileType, string(info.ID))
|
||||
|
||||
@ -271,7 +375,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
|
||||
|
||||
// TODO: Check free space
|
||||
|
||||
best = filepath.Join(p.local, fileType.String(), SectorName(sid))
|
||||
best = p.sectorPath(sid, fileType)
|
||||
bestID = si.ID
|
||||
}
|
||||
|
||||
@ -387,7 +491,7 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
|
||||
return xerrors.Errorf("dropping sector from index: %w", err)
|
||||
}
|
||||
|
||||
spath := filepath.Join(p.local, typ.String(), SectorName(sid))
|
||||
spath := p.sectorPath(sid, typ)
|
||||
log.Infof("remove %s", spath)
|
||||
|
||||
if err := os.RemoveAll(spath); err != nil {
|
||||
@ -398,12 +502,12 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
|
||||
}
|
||||
|
||||
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error {
|
||||
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
|
||||
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, PathStorage, AcquireMove)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire dest storage: %w", err)
|
||||
}
|
||||
|
||||
src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
|
||||
src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire src storage: %w", err)
|
||||
}
|
||||
@ -454,16 +558,16 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist
|
||||
|
||||
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
||||
|
||||
func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||
func (st *Local) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
|
||||
st.localLk.RLock()
|
||||
defer st.localLk.RUnlock()
|
||||
|
||||
p, ok := st.paths[id]
|
||||
if !ok {
|
||||
return FsStat{}, errPathNotFound
|
||||
return fsutil.FsStat{}, errPathNotFound
|
||||
}
|
||||
|
||||
return st.localStorage.Stat(p.local)
|
||||
return p.stat(st.localStorage)
|
||||
}
|
||||
|
||||
var _ Store = &Local{}
|
||||
|
@ -3,6 +3,7 @@ package stores
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"github.com/google/uuid"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -19,6 +20,10 @@ type TestingLocalStorage struct {
|
||||
c StorageConfig
|
||||
}
|
||||
|
||||
func (t *TestingLocalStorage) DiskUsage(path string) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) {
|
||||
return t.c, nil
|
||||
}
|
||||
@ -28,11 +33,10 @@ func (t *TestingLocalStorage) SetStorage(f func(*StorageConfig)) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TestingLocalStorage) Stat(path string) (FsStat, error) {
|
||||
return FsStat{
|
||||
func (t *TestingLocalStorage) Stat(path string) (fsutil.FsStat, error) {
|
||||
return fsutil.FsStat{
|
||||
Capacity: pathSize,
|
||||
Available: pathSize,
|
||||
Used: 0,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package stores
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"io/ioutil"
|
||||
"math/bits"
|
||||
"mime"
|
||||
@ -270,7 +271,7 @@ func (r *Remote) deleteFromRemote(ctx context.Context, url string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||
func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
|
||||
st, err := r.local.FsStat(ctx, id)
|
||||
switch err {
|
||||
case nil:
|
||||
@ -278,53 +279,53 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||
case errPathNotFound:
|
||||
break
|
||||
default:
|
||||
return FsStat{}, xerrors.Errorf("local stat: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("local stat: %w", err)
|
||||
}
|
||||
|
||||
si, err := r.index.StorageInfo(ctx, id)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("getting remote storage info: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("getting remote storage info: %w", err)
|
||||
}
|
||||
|
||||
if len(si.URLs) == 0 {
|
||||
return FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id)
|
||||
}
|
||||
|
||||
rl, err := url.Parse(si.URLs[0])
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("failed to parse url: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
|
||||
rl.Path = gopath.Join(rl.Path, "stat", string(id))
|
||||
|
||||
req, err := http.NewRequest("GET", rl.String(), nil)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("request: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("do request: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
switch resp.StatusCode {
|
||||
case 200:
|
||||
break
|
||||
case 404:
|
||||
return FsStat{}, errPathNotFound
|
||||
return fsutil.FsStat{}, errPathNotFound
|
||||
case 500:
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err)
|
||||
}
|
||||
|
||||
return FsStat{}, xerrors.Errorf("fsstat: got http 500: %s", string(b))
|
||||
return fsutil.FsStat{}, xerrors.Errorf("fsstat: got http 500: %s", string(b))
|
||||
}
|
||||
|
||||
var out FsStat
|
||||
var out fsutil.FsStat
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return FsStat{}, xerrors.Errorf("decoding fsstat: %w", err)
|
||||
return fsutil.FsStat{}, xerrors.Errorf("decoding fsstat: %w", err)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
Loading…
Reference in New Issue
Block a user