Merge pull request #7681 from filecoin-project/feat/storage-tar-buf
storage: Use 1M buffers for Tar transfers
This commit is contained in:
commit
ab55a6fbbe
14
extern/sector-storage/stores/http_handler.go
vendored
14
extern/sector-storage/stores/http_handler.go
vendored
@ -2,7 +2,6 @@ package stores
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -139,17 +138,12 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rd, err := tarutil.TarDirectory(path)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/x-tar")
|
w.Header().Set("Content-Type", "application/x-tar")
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
if _, err := io.CopyBuffer(w, rd, make([]byte, CopyBuf)); err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
err := tarutil.TarDirectory(path, w, make([]byte, CopyBuf))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("send tar: %+v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
2
extern/sector-storage/stores/remote.go
vendored
2
extern/sector-storage/stores/remote.go
vendored
@ -281,7 +281,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
|||||||
|
|
||||||
switch mediatype {
|
switch mediatype {
|
||||||
case "application/x-tar":
|
case "application/x-tar":
|
||||||
return tarutil.ExtractTar(resp.Body, outname)
|
return tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
|
||||||
case "application/octet-stream":
|
case "application/octet-stream":
|
||||||
f, err := os.Create(outname)
|
f, err := os.Create(outname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
18
extern/sector-storage/tarutil/systar.go
vendored
18
extern/sector-storage/tarutil/systar.go
vendored
@ -14,7 +14,7 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("tarutil") // nolint
|
var log = logging.Logger("tarutil") // nolint
|
||||||
|
|
||||||
func ExtractTar(body io.Reader, dir string) error {
|
func ExtractTar(body io.Reader, dir string, buf []byte) error {
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil { // nolint
|
if err := os.MkdirAll(dir, 0755); err != nil { // nolint
|
||||||
return xerrors.Errorf("mkdir: %w", err)
|
return xerrors.Errorf("mkdir: %w", err)
|
||||||
}
|
}
|
||||||
@ -38,7 +38,7 @@ func ExtractTar(body io.Reader, dir string) error {
|
|||||||
|
|
||||||
// This data is coming from a trusted source, no need to check the size.
|
// This data is coming from a trusted source, no need to check the size.
|
||||||
//nolint:gosec
|
//nolint:gosec
|
||||||
if _, err := io.Copy(f, tr); err != nil {
|
if _, err := io.CopyBuffer(f, tr, buf); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,17 +48,7 @@ func ExtractTar(body io.Reader, dir string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TarDirectory(dir string) (io.ReadCloser, error) {
|
func TarDirectory(dir string, w io.Writer, buf []byte) error {
|
||||||
r, w := io.Pipe()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
_ = w.CloseWithError(writeTarDirectory(dir, w))
|
|
||||||
}()
|
|
||||||
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeTarDirectory(dir string, w io.Writer) error {
|
|
||||||
tw := tar.NewWriter(w)
|
tw := tar.NewWriter(w)
|
||||||
|
|
||||||
files, err := ioutil.ReadDir(dir)
|
files, err := ioutil.ReadDir(dir)
|
||||||
@ -81,7 +71,7 @@ func writeTarDirectory(dir string, w io.Writer) error {
|
|||||||
return xerrors.Errorf("opening %s for reading: %w", file.Name(), err)
|
return xerrors.Errorf("opening %s for reading: %w", file.Name(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.Copy(tw, f); err != nil {
|
if _, err := io.CopyBuffer(tw, f, buf); err != nil {
|
||||||
return xerrors.Errorf("copy data for file %s: %w", file.Name(), err)
|
return xerrors.Errorf("copy data for file %s: %w", file.Name(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user