deals: Use temp files for AddPiece

This commit is contained in:
Łukasz Magiera 2019-08-07 14:41:27 +02:00 committed by whyrusleeping
parent 5e176eed78
commit 3e2d04a540
4 changed files with 43 additions and 197 deletions

View File

@ -10,7 +10,10 @@ import (
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
"io"
"io/ioutil"
"math" "math"
"os"
"github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/lib/cborrpc"
@ -147,7 +150,7 @@ func (h *Handler) Run(ctx context.Context) {
continue continue
} }
f, ok := n.(files.File) uf, ok := n.(files.File)
if !ok { if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode? // TODO: we probably got directory, how should we handle this in unixfs mode?
log.Errorf("unsupported unixfs type") log.Errorf("unsupported unixfs type")
@ -155,20 +158,42 @@ func (h *Handler) Run(ctx context.Context) {
continue continue
} }
size, err := f.Size() size, err := uf.Size()
if err != nil { if err != nil {
log.Errorf("failed to get file size: %s", err) log.Errorf("failed to get file size: %s", err)
// TODO: fail deal // TODO: fail deal
continue continue
} }
// TODO: can we use pipes? //////////////
sectorID, err := h.sb.AddPiece(ctx, deal.Proposal.PieceRef, uint64(size), f)
f, err := ioutil.TempFile(os.TempDir(), "piece-temp-")
if err != nil {
log.Error(err)
// TODO: fail deal
continue
}
if _, err := io.Copy(f, uf); err != nil {
log.Error(err)
// TODO: fail deal
continue
}
if err := f.Close(); err != nil {
log.Error(err)
// TODO: fail deal
continue
}
sectorID, err := h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f.Name())
if err != nil { if err != nil {
// TODO: fail deal // TODO: fail deal
log.Errorf("AddPiece failed: %s", err) log.Errorf("AddPiece failed: %s", err)
continue continue
} }
if err := os.Remove(f.Name()); err != nil {
log.Error(err)
// TODO: fail deal
continue
}
log.Warnf("New Sector: %d", sectorID) log.Warnf("New Sector: %d", sectorID)

View File

@ -1,106 +0,0 @@
// +build !windows
// TODO: windows now has pipes, verify if it maybe works
// TODO: extract from filecoin
package bytesink
import (
"fmt"
"io/ioutil"
"os"
"syscall"
"github.com/pkg/errors"
)
// FifoByteSink is not safe for concurrent access, as writes to underlying pipe are atomic only
// if len(buf) is less than the OS-specific PIPE_BUF value.
type FifoByteSink struct {
file *os.File
path string
}
// Open prepares the sink for writing by opening the backing FIFO file. Open
// will block until someone opens the FIFO file for reading.
func (s *FifoByteSink) Open() error {
file, err := os.OpenFile(s.path, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
return errors.Wrap(err, "failed to open pipe")
}
s.file = file
return nil
}
// Write writes the provided buffer to the underlying pipe. Write will block
// until the provided buffer's bytes have been read from the read end of the
// pipe.
//
// Warning: Writes are atomic only if len(buf) is less than the OS-specific
// PIPE_BUF value. For more information, see:
//
// http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
func (s *FifoByteSink) Write(buf []byte) (int, error) {
return s.file.Write(buf)
}
// Close ensures that the underlying file is closed and removed.
func (s *FifoByteSink) Close() (retErr error) {
cerr := s.file.Close()
if cerr != nil {
return cerr
}
defer func() {
rerr := os.Remove(s.path)
if retErr == nil {
retErr = rerr
}
}()
return
}
// ID produces a string-identifier for this byte sink. For now, this is just the
// path of the FIFO file. This string may get more structured in the future.
func (s *FifoByteSink) ID() string {
return s.path
}
// NewFifo creates a FIFO pipe and returns a pointer to a FifoByteSink, which
// satisfies the ByteSink interface. The FIFO pipe is used to stream bytes to
// rust-fil-proofs from Go during the piece-adding flow. Writes to the pipe are
// buffered automatically by the OS; the size of the buffer varies.
func NewFifo() (*FifoByteSink, error) {
path, err := createTmpFifoPath()
if err != nil {
return nil, errors.Wrap(err, "creating FIFO path failed")
}
err = syscall.Mkfifo(path, 0600)
if err != nil {
return nil, errors.Wrap(err, "mkfifo failed")
}
return &FifoByteSink{
path: path,
}, nil
}
// createTmpFifoPath creates a path with which a temporary FIFO file may be
// created.
func createTmpFifoPath() (string, error) {
file, err := ioutil.TempFile("", "")
if err != nil {
return "", err
}
err = file.Close()
if err != nil {
return "", err
}
return fmt.Sprintf("%s.fifo", file.Name()), nil
}

View File

@ -3,17 +3,13 @@ package sectorbuilder
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"io"
"unsafe" "unsafe"
"golang.org/x/xerrors"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder" sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/lib/bytesink"
) )
var log = logging.Logger("sectorbuilder") var log = logging.Logger("sectorbuilder")
@ -72,84 +68,8 @@ func (sb *SectorBuilder) Destroy() {
sectorbuilder.DestroySectorBuilder(sb.handle) sectorbuilder.DestroySectorBuilder(sb.handle)
} }
func (sb *SectorBuilder) AddPiece(ctx context.Context, pieceRef string, pieceSize uint64, pieceReader io.ReadCloser) (uint64, error) { func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, piecePath string) (uint64, error) {
fifoFile, err := bytesink.NewFifo() return sectorbuilder.AddPiece(sb.handle, pieceKey, pieceSize, piecePath)
if err != nil {
return 0, err
}
// errCh holds any error encountered when streaming bytes or making the CGO
// call. The channel is buffered so that the goroutines can exit, which will
// close the pipe, which unblocks the CGO call.
errCh := make(chan error, 2)
// sectorIDCh receives a value if the CGO call indicates that the client
// piece has successfully been added to a sector. The channel is buffered
// so that the goroutine can exit if a value is sent to errCh before the
// CGO call completes.
sectorIDCh := make(chan uint64, 1)
// goroutine attempts to copy bytes from piece's reader to the fifoFile
go func() {
// opening the fifoFile blocks the goroutine until a reader is opened on the
// other end of the FIFO pipe
err := fifoFile.Open()
if err != nil {
errCh <- xerrors.Errorf("failed to open fifoFile: %w", err)
return
}
// closing theg s fifoFile signals to the reader that we're done writing, which
// unblocks the reader
defer func() {
err := fifoFile.Close()
if err != nil {
log.Warnf("failed to close fifoFile: %s", err)
}
}()
n, err := io.Copy(fifoFile, pieceReader)
if err != nil {
errCh <- xerrors.Errorf("failed to copy to pipe: %w", err)
return
}
if uint64(n) != pieceSize {
errCh <- xerrors.Errorf("expected to write %d bytes but wrote %d", pieceSize, n)
return
}
}()
// goroutine makes CGO call, which blocks until FIFO pipe opened for writing
// from within other goroutine
go func() {
id, err := sectorbuilder.AddPiece(sb.handle, pieceRef, pieceSize, fifoFile.ID())
if err != nil {
msg := "CGO add_piece returned an error (err=%s, fifo path=%s)"
log.Errorf(msg, err, fifoFile.ID())
errCh <- err
return
}
sectorIDCh <- id
}()
select {
case <-ctx.Done():
errStr := "context completed before CGO call could return"
strFmt := "%s (sinkPath=%s)"
log.Errorf(strFmt, errStr, fifoFile.ID())
return 0, xerrors.New(errStr)
case err := <-errCh:
errStr := "error streaming piece-bytes"
strFmt := "%s (sinkPath=%s)"
log.Errorf(strFmt, errStr, fifoFile.ID())
return 0, xerrors.Errorf("%w: %s", errStr, err)
case sectorID := <-sectorIDCh:
return sectorID, nil
}
} }
// TODO: should *really really* return an io.ReadCloser // TODO: should *really really* return an io.ReadCloser

View File

@ -3,7 +3,6 @@ package impl
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -21,12 +20,20 @@ type StorageMinerAPI struct {
} }
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
maxSize := 1016 // this is the most data we can fit in a 1024 byte sector maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector
data := make([]byte, maxSize)
fi, err := ioutil.TempFile("", "lotus-garbage")
if err != nil {
return 0, err
}
if _, err := fi.Write(data); err != nil {
return 0, err
}
fi.Close()
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name())
sectorId, err := sm.SectorBuilder.AddPiece(ctx, name, uint64(maxSize),
ioutil.NopCloser(io.LimitReader(rand.New(rand.NewSource(rand.Int63())), int64(maxSize))))
if err != nil { if err != nil {
return 0, err return 0, err
} }