changes as per review
This commit is contained in:
parent
2c9f5922b5
commit
50e023edd3
3
extern/sector-storage/piece_provider.go
vendored
3
extern/sector-storage/piece_provider.go
vendored
@ -71,6 +71,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
|
|||||||
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
|
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
|
||||||
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
|
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
|
||||||
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
|
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
|
||||||
|
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
|
||||||
|
// the returned boolean parameter will be set to true.
|
||||||
|
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
|
||||||
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||||
if err := offset.Valid(); err != nil {
|
if err := offset.Valid(); err != nil {
|
||||||
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
|
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
|
||||||
|
1
extern/sector-storage/stores/http_handler.go
vendored
1
extern/sector-storage/stores/http_handler.go
vendored
@ -155,7 +155,6 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
w.WriteHeader(200)
|
|
||||||
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
|
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
|
||||||
http.ServeFile(w, r, path)
|
http.ServeFile(w, r, path)
|
||||||
}
|
}
|
||||||
|
10
extern/sector-storage/stores/remote.go
vendored
10
extern/sector-storage/stores/remote.go
vendored
@ -560,6 +560,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
|||||||
return si[i].Weight > si[j].Weight
|
return si[i].Weight > si[j].Weight
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
for _, info := range si {
|
for _, info := range si {
|
||||||
for _, url := range info.URLs {
|
for _, url := range info.URLs {
|
||||||
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
|
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
|
||||||
@ -567,6 +568,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
|||||||
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
|
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("check if remote has piece", "url", url, "error", err)
|
log.Warnw("check if remote has piece", "url", url, "error", err)
|
||||||
|
lastErr = err
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -578,6 +580,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
|||||||
rd, err := r.readRemote(ctx, url, offset, size)
|
rd, err := r.readRemote(ctx, url, offset, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("reading from remote", "url", url, "error", err)
|
log.Warnw("reading from remote", "url", url, "error", err)
|
||||||
|
lastErr = err
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
|
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
|
||||||
@ -586,12 +589,15 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
|||||||
}
|
}
|
||||||
|
|
||||||
// we couldn't find a unsealed file with the unsealed piece, will return a nil reader.
|
// we couldn't find a unsealed file with the unsealed piece, will return a nil reader.
|
||||||
log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d)", s, offset, size)
|
log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d), last error=%s", s, offset, size, lastErr)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
|
func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
|
||||||
panic("not implemented")
|
log.Warnf("reserve called on remote store, sectorID: %v", sid.ID)
|
||||||
|
return func() {
|
||||||
|
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Store = &Remote{}
|
var _ Store = &Remote{}
|
||||||
|
5
extern/sector-storage/stores/util_unix.go
vendored
5
extern/sector-storage/stores/util_unix.go
vendored
@ -2,6 +2,7 @@ package stores
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -37,6 +38,10 @@ func move(from, to string) error {
|
|||||||
|
|
||||||
var cmd *exec.Cmd
|
var cmd *exec.Cmd
|
||||||
if runtime.GOOS == "darwin" {
|
if runtime.GOOS == "darwin" {
|
||||||
|
if err := os.MkdirAll(toDir, 0777); err != nil {
|
||||||
|
return xerrors.Errorf("failed exec MkdirAll: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
cmd = exec.Command("/usr/bin/env", "mv", from, toDir) // nolint
|
cmd = exec.Command("/usr/bin/env", "mv", from, toDir) // nolint
|
||||||
} else {
|
} else {
|
||||||
cmd = exec.Command("/usr/bin/env", "mv", "-t", toDir, from) // nolint
|
cmd = exec.Command("/usr/bin/env", "mv", "-t", toDir, from) // nolint
|
||||||
|
Loading…
Reference in New Issue
Block a user