From 3e2d04a540e0e2dc9ef392abbf4cfde6a8d5f39f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 7 Aug 2019 14:41:27 +0200 Subject: [PATCH] deals: Use temp files for AddPiece --- chain/deals/handler.go | 33 +++++++-- lib/bytesink/fifo.go | 106 ----------------------------- lib/sectorbuilder/sectorbuilder.go | 84 +---------------------- node/impl/storminer.go | 17 +++-- 4 files changed, 43 insertions(+), 197 deletions(-) delete mode 100644 lib/bytesink/fifo.go diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 23beb70cb..741652a03 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -10,7 +10,10 @@ import ( files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" + "io" + "io/ioutil" "math" + "os" "github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/lib/cborrpc" @@ -147,7 +150,7 @@ func (h *Handler) Run(ctx context.Context) { continue } - f, ok := n.(files.File) + uf, ok := n.(files.File) if !ok { // TODO: we probably got directory, how should we handle this in unixfs mode? log.Errorf("unsupported unixfs type") @@ -155,20 +158,42 @@ func (h *Handler) Run(ctx context.Context) { continue } - size, err := f.Size() + size, err := uf.Size() if err != nil { log.Errorf("failed to get file size: %s", err) // TODO: fail deal continue } - // TODO: can we use pipes? - sectorID, err := h.sb.AddPiece(ctx, deal.Proposal.PieceRef, uint64(size), f) + ////////////// + + f, err := ioutil.TempFile(os.TempDir(), "piece-temp-") + if err != nil { + log.Error(err) + // TODO: fail deal + continue + } + if _, err := io.Copy(f, uf); err != nil { + log.Error(err) + // TODO: fail deal + continue + } + if err := f.Close(); err != nil { + log.Error(err) + // TODO: fail deal + continue + } + sectorID, err := h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f.Name()) if err != nil { // TODO: fail deal log.Errorf("AddPiece failed: %s", err) continue } + if err := os.Remove(f.Name()); err != nil { + log.Error(err) + // TODO: fail deal + continue + } log.Warnf("New Sector: %d", sectorID) diff --git a/lib/bytesink/fifo.go b/lib/bytesink/fifo.go deleted file mode 100644 index d035fe83e..000000000 --- a/lib/bytesink/fifo.go +++ /dev/null @@ -1,106 +0,0 @@ -// +build !windows -// TODO: windows now has pipes, verify if it maybe works - -// TODO: extract from filecoin - -package bytesink - -import ( - "fmt" - "io/ioutil" - "os" - "syscall" - - "github.com/pkg/errors" -) - -// FifoByteSink is not safe for concurrent access, as writes to underlying pipe are atomic only -// if len(buf) is less than the OS-specific PIPE_BUF value. -type FifoByteSink struct { - file *os.File - path string -} - -// Open prepares the sink for writing by opening the backing FIFO file. Open -// will block until someone opens the FIFO file for reading. -func (s *FifoByteSink) Open() error { - file, err := os.OpenFile(s.path, os.O_WRONLY, os.ModeNamedPipe) - if err != nil { - return errors.Wrap(err, "failed to open pipe") - } - - s.file = file - - return nil -} - -// Write writes the provided buffer to the underlying pipe. Write will block -// until the provided buffer's bytes have been read from the read end of the -// pipe. -// -// Warning: Writes are atomic only if len(buf) is less than the OS-specific -// PIPE_BUF value. For more information, see: -// -// http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html -func (s *FifoByteSink) Write(buf []byte) (int, error) { - return s.file.Write(buf) -} - -// Close ensures that the underlying file is closed and removed. -func (s *FifoByteSink) Close() (retErr error) { - cerr := s.file.Close() - if cerr != nil { - return cerr - } - - defer func() { - rerr := os.Remove(s.path) - if retErr == nil { - retErr = rerr - } - }() - - return -} - -// ID produces a string-identifier for this byte sink. For now, this is just the -// path of the FIFO file. This string may get more structured in the future. -func (s *FifoByteSink) ID() string { - return s.path -} - -// NewFifo creates a FIFO pipe and returns a pointer to a FifoByteSink, which -// satisfies the ByteSink interface. The FIFO pipe is used to stream bytes to -// rust-fil-proofs from Go during the piece-adding flow. Writes to the pipe are -// buffered automatically by the OS; the size of the buffer varies. -func NewFifo() (*FifoByteSink, error) { - path, err := createTmpFifoPath() - if err != nil { - return nil, errors.Wrap(err, "creating FIFO path failed") - } - - err = syscall.Mkfifo(path, 0600) - if err != nil { - return nil, errors.Wrap(err, "mkfifo failed") - } - - return &FifoByteSink{ - path: path, - }, nil -} - -// createTmpFifoPath creates a path with which a temporary FIFO file may be -// created. -func createTmpFifoPath() (string, error) { - file, err := ioutil.TempFile("", "") - if err != nil { - return "", err - } - - err = file.Close() - if err != nil { - return "", err - } - - return fmt.Sprintf("%s.fifo", file.Name()), nil -} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 468238457..28fd72d22 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -3,17 +3,13 @@ package sectorbuilder import ( "context" "encoding/binary" - "io" "unsafe" - "golang.org/x/xerrors" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" logging "github.com/ipfs/go-log" "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/lib/bytesink" ) var log = logging.Logger("sectorbuilder") @@ -72,84 +68,8 @@ func (sb *SectorBuilder) Destroy() { sectorbuilder.DestroySectorBuilder(sb.handle) } -func (sb *SectorBuilder) AddPiece(ctx context.Context, pieceRef string, pieceSize uint64, pieceReader io.ReadCloser) (uint64, error) { - fifoFile, err := bytesink.NewFifo() - if err != nil { - return 0, err - } - - // errCh holds any error encountered when streaming bytes or making the CGO - // call. The channel is buffered so that the goroutines can exit, which will - // close the pipe, which unblocks the CGO call. - errCh := make(chan error, 2) - - // sectorIDCh receives a value if the CGO call indicates that the client - // piece has successfully been added to a sector. The channel is buffered - // so that the goroutine can exit if a value is sent to errCh before the - // CGO call completes. - sectorIDCh := make(chan uint64, 1) - - // goroutine attempts to copy bytes from piece's reader to the fifoFile - go func() { - // opening the fifoFile blocks the goroutine until a reader is opened on the - // other end of the FIFO pipe - err := fifoFile.Open() - if err != nil { - errCh <- xerrors.Errorf("failed to open fifoFile: %w", err) - return - } - - // closing theg s fifoFile signals to the reader that we're done writing, which - // unblocks the reader - defer func() { - err := fifoFile.Close() - if err != nil { - log.Warnf("failed to close fifoFile: %s", err) - } - }() - - n, err := io.Copy(fifoFile, pieceReader) - if err != nil { - errCh <- xerrors.Errorf("failed to copy to pipe: %w", err) - return - } - - if uint64(n) != pieceSize { - errCh <- xerrors.Errorf("expected to write %d bytes but wrote %d", pieceSize, n) - return - } - }() - - // goroutine makes CGO call, which blocks until FIFO pipe opened for writing - // from within other goroutine - go func() { - id, err := sectorbuilder.AddPiece(sb.handle, pieceRef, pieceSize, fifoFile.ID()) - if err != nil { - msg := "CGO add_piece returned an error (err=%s, fifo path=%s)" - log.Errorf(msg, err, fifoFile.ID()) - errCh <- err - return - } - - sectorIDCh <- id - }() - - select { - case <-ctx.Done(): - errStr := "context completed before CGO call could return" - strFmt := "%s (sinkPath=%s)" - log.Errorf(strFmt, errStr, fifoFile.ID()) - - return 0, xerrors.New(errStr) - case err := <-errCh: - errStr := "error streaming piece-bytes" - strFmt := "%s (sinkPath=%s)" - log.Errorf(strFmt, errStr, fifoFile.ID()) - - return 0, xerrors.Errorf("%w: %s", errStr, err) - case sectorID := <-sectorIDCh: - return sectorID, nil - } +func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, piecePath string) (uint64, error) { + return sectorbuilder.AddPiece(sb.handle, pieceKey, pieceSize, piecePath) } // TODO: should *really really* return an io.ReadCloser diff --git a/node/impl/storminer.go b/node/impl/storminer.go index bac1da2c8..3219f9943 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,7 +3,6 @@ package impl import ( "context" "fmt" - "io" "io/ioutil" "math/rand" @@ -21,12 +20,20 @@ type StorageMinerAPI struct { } func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { - maxSize := 1016 // this is the most data we can fit in a 1024 byte sector + maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector + data := make([]byte, maxSize) + fi, err := ioutil.TempFile("", "lotus-garbage") + if err != nil { + return 0, err + } + + if _, err := fi.Write(data); err != nil { + return 0, err + } + fi.Close() name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) - - sectorId, err := sm.SectorBuilder.AddPiece(ctx, name, uint64(maxSize), - ioutil.NopCloser(io.LimitReader(rand.New(rand.NewSource(rand.Int63())), int64(maxSize)))) + sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name()) if err != nil { return 0, err }