package stores import ( "context" "encoding/json" "fmt" "io" "io/ioutil" "math/bits" "mime" "net/http" "net/url" "os" gopath "path" "path/filepath" "sort" "sync" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/partialfile" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/tarutil" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-storage/storage" "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" ) var FetchTempSubdir = "fetching" var CopyBuf = 1 << 20 type Remote struct { local *Local index SectorIndex auth http.Header limit chan struct{} fetchLk sync.Mutex fetching map[abi.SectorID]chan struct{} } func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error { // TODO: do this on remotes too // (not that we really need to do that since it's always called by the // worker which pulled the copy) return r.local.RemoveCopies(ctx, s, types) } func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int) *Remote { return &Remote{ local: local, index: index, auth: auth, limit: make(chan struct{}, fetchLimit), fetching: map[abi.SectorID]chan struct{}{}, } } func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { if existing|allocate != existing^allocate { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } for { r.fetchLk.Lock() c, locked := r.fetching[s.ID] if !locked { r.fetching[s.ID] = make(chan struct{}) r.fetchLk.Unlock() break } r.fetchLk.Unlock() select { case <-c: continue case <-ctx.Done(): return storiface.SectorPaths{}, storiface.SectorPaths{}, ctx.Err() } } defer func() { r.fetchLk.Lock() close(r.fetching[s.ID]) delete(r.fetching, s.ID) r.fetchLk.Unlock() }() paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) } var toFetch storiface.SectorFileType for _, fileType := range storiface.PathTypes { if fileType&existing == 0 { continue } if storiface.PathByType(paths, fileType) == "" { toFetch |= fileType } } apaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err) } odt := storiface.FSOverheadSeal if pathType == storiface.PathStorage { odt = storiface.FsOverheadFinalized } releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) } defer releaseStorage() for _, fileType := range storiface.PathTypes { if fileType&existing == 0 { continue } if storiface.PathByType(paths, fileType) != "" { continue } dest := storiface.PathByType(apaths, fileType) storageID := storiface.PathByType(ids, fileType) url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, err } storiface.SetPathByType(&paths, fileType, dest) storiface.SetPathByType(&stores, fileType, storageID) if err := r.index.StorageDeclareSector(ctx, ID(storageID), s.ID, fileType, op == storiface.AcquireMove); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) continue } if op == storiface.AcquireMove { if err := r.deleteFromRemote(ctx, url); err != nil { log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) } } } return paths, stores, nil } func tempFetchDest(spath string, create bool) (string, error) { st, b := filepath.Split(spath) tempdir := filepath.Join(st, FetchTempSubdir) if create { if err := os.MkdirAll(tempdir, 0755); err != nil { // nolint return "", xerrors.Errorf("creating temp fetch dir: %w", err) } } return filepath.Join(tempdir, b), nil } func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string) (string, error) { si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false) if err != nil { return "", err } if len(si) == 0 { return "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound) } sort.Slice(si, func(i, j int) bool { return si[i].Weight < si[j].Weight }) var merr error for _, info := range si { // TODO: see what we have local, prefer that for _, url := range info.URLs { tempDest, err := tempFetchDest(dest, true) if err != nil { return "", err } if err := os.RemoveAll(dest); err != nil { return "", xerrors.Errorf("removing dest: %w", err) } err = r.fetch(ctx, url, tempDest) if err != nil { merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err)) continue } if err := move(tempDest, dest); err != nil { return "", xerrors.Errorf("fetch move error (storage %s) %s -> %s: %w", info.ID, tempDest, dest, err) } if merr != nil { log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) } return url, nil } } return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) } func (r *Remote) fetch(ctx context.Context, url, outname string) error { log.Infof("Fetch %s -> %s", url, outname) if len(r.limit) >= cap(r.limit) { log.Infof("Throttling fetch, %d already running", len(r.limit)) } // TODO: Smarter throttling // * Priority (just going sequentially is still pretty good) // * Per interface // * Aware of remote load select { case r.limit <- struct{}{}: defer func() { <-r.limit }() case <-ctx.Done(): return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err()) } req, err := http.NewRequest("GET", url, nil) if err != nil { return xerrors.Errorf("request: %w", err) } req.Header = r.auth req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { return xerrors.Errorf("do request: %w", err) } defer resp.Body.Close() // nolint if resp.StatusCode != 200 { return xerrors.Errorf("non-200 code: %d", resp.StatusCode) } /*bar := pb.New64(w.sizeForType(typ)) bar.ShowPercent = true bar.ShowSpeed = true bar.Units = pb.U_BYTES barreader := bar.NewProxyReader(resp.Body) bar.Start() defer bar.Finish()*/ mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) if err != nil { return xerrors.Errorf("parse media type: %w", err) } if err := os.RemoveAll(outname); err != nil { return xerrors.Errorf("removing dest: %w", err) } switch mediatype { case "application/x-tar": return tarutil.ExtractTar(resp.Body, outname) case "application/octet-stream": f, err := os.Create(outname) if err != nil { return err } _, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf)) if err != nil { f.Close() // nolint return err } return f.Close() default: return xerrors.Errorf("unknown content type: '%s'", mediatype) } } func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error { // Make sure we have the data local _, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return xerrors.Errorf("acquire src storage (remote): %w", err) } return r.local.MoveStorage(ctx, s, types) } func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool) error { if bits.OnesCount(uint(typ)) != 1 { return xerrors.New("delete expects one file type") } if err := r.local.Remove(ctx, sid, typ, force); err != nil { return xerrors.Errorf("remove from local: %w", err) } si, err := r.index.StorageFindSector(ctx, sid, typ, 0, false) if err != nil { return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) } for _, info := range si { for _, url := range info.URLs { if err := r.deleteFromRemote(ctx, url); err != nil { log.Warnf("remove %s: %+v", url, err) continue } break } } return nil } func (r *Remote) deleteFromRemote(ctx context.Context, url string) error { log.Infof("Delete %s", url) req, err := http.NewRequest("DELETE", url, nil) if err != nil { return xerrors.Errorf("request: %w", err) } req.Header = r.auth req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { return xerrors.Errorf("do request: %w", err) } defer resp.Body.Close() // nolint if resp.StatusCode != 200 { return xerrors.Errorf("non-200 code: %d", resp.StatusCode) } return nil } func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) { st, err := r.local.FsStat(ctx, id) switch err { case nil: return st, nil case errPathNotFound: break default: return fsutil.FsStat{}, xerrors.Errorf("local stat: %w", err) } si, err := r.index.StorageInfo(ctx, id) if err != nil { return fsutil.FsStat{}, xerrors.Errorf("getting remote storage info: %w", err) } if len(si.URLs) == 0 { return fsutil.FsStat{}, xerrors.Errorf("no known URLs for remote storage %s", id) } rl, err := url.Parse(si.URLs[0]) if err != nil { return fsutil.FsStat{}, xerrors.Errorf("failed to parse url: %w", err) } rl.Path = gopath.Join(rl.Path, "stat", string(id)) req, err := http.NewRequest("GET", rl.String(), nil) if err != nil { return fsutil.FsStat{}, xerrors.Errorf("request: %w", err) } req.Header = r.auth req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { return fsutil.FsStat{}, xerrors.Errorf("do request: %w", err) } switch resp.StatusCode { case 200: break case 404: return fsutil.FsStat{}, errPathNotFound case 500: b, err := ioutil.ReadAll(resp.Body) if err != nil { return fsutil.FsStat{}, xerrors.Errorf("fsstat: got http 500, then failed to read the error: %w", err) } return fsutil.FsStat{}, xerrors.Errorf("fsstat: got http 500: %s", string(b)) } var out fsutil.FsStat if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return fsutil.FsStat{}, xerrors.Errorf("decoding fsstat: %w", err) } defer resp.Body.Close() // nolint return out, nil } func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) { url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded()) req, err := http.NewRequest("GET", url, nil) if err != nil { return false, xerrors.Errorf("request: %w", err) } req.Header = r.auth.Clone() req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { return false, xerrors.Errorf("do request: %w", err) } defer resp.Body.Close() // nolint switch resp.StatusCode { case http.StatusOK: return true, nil case http.StatusRequestedRangeNotSatisfiable: return false, nil default: return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode) } } func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { if len(r.limit) >= cap(r.limit) { log.Infof("Throttling remote read, %d already running", len(r.limit)) } // TODO: Smarter throttling // * Priority (just going sequentially is still pretty good) // * Per interface // * Aware of remote load select { case r.limit <- struct{}{}: defer func() { <-r.limit }() case <-ctx.Done(): return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err()) } req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, xerrors.Errorf("request: %w", err) } req.Header = r.auth.Clone() req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)) req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, xerrors.Errorf("do request: %w", err) } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { resp.Body.Close() // nolint return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode) } return resp.Body, nil } // Reader returns a reader for an unsealed piece at the given offset in the given sector. // If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy. // If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file // to know if they have the unsealed piece and will then read the unsealed piece data from a worker that has it. // // Returns a nil reader if : // 1. no worker(local worker included) has an unsealed file for the given sector OR // 2. no worker(local worker included) has the unsealed piece in their unsealed sector file. // Will return a nil reader and a nil error in such a case. func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { ft := storiface.FTUnsealed // check if we have the unsealed sector file locally paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return nil, xerrors.Errorf("acquire local: %w", err) } path := storiface.PathByType(paths, ft) if path != "" { // if we have the unsealed file locally, return a reader that can be used to read the contents of the // unsealed piece. log.Infof("Read local %s (+%d,%d)", path, offset, size) ssize, err := s.ProofType.SectorSize() if err != nil { return nil, err } // open the unsealed sector file for the given sector size located at the given path. pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path) if err != nil { return nil, xerrors.Errorf("opening partial file: %w", err) } // even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece // in the unsealed sector file. That is what `HasAllocated` checks for. has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded()) if err != nil { return nil, xerrors.Errorf("has allocated: %w", err) } if !has { if err := pf.Close(); err != nil { return nil, xerrors.Errorf("close partial file: %w", err) } return nil, nil } log.Debugf("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) return pf.Reader(storiface.PaddedByteIndex(offset), size) } // --- We don't have the unsealed sector file locally // if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index // to determine which workers have the unsealed file and then query those workers to know // if they have the unsealed piece in the unsealed sector file. si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) if err != nil { return nil, err } if len(si) == 0 { return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) } // TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ? sort.Slice(si, func(i, j int) bool { return si[i].Weight < si[j].Weight }) for _, info := range si { for _, url := range info.URLs { // checkAllocated makes a JSON RPC query to a remote worker to determine if it has // unsealed piece in their unsealed sector file. ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size) if err != nil { log.Warnw("check if remote has piece", "url", url, "error", err) continue } if !ok { continue } // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. rd, err := r.readRemote(ctx, url, offset, size) if err != nil { log.Warnw("reading from remote", "url", url, "error", err) continue } log.Infof("Read remote %s (+%d,%d)", url, offset, size) return rd, nil } } // we couldn't find a unsealed file with the unsealed piece, will return a nil reader. log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d)", s, offset, size) return nil, nil } var _ Store = &Remote{}