update retrievaladapter ; add piece_provider

This commit is contained in:
Anton Evangelatov 2021-05-20 12:49:44 +02:00
parent c12d802811
commit cb603c62d9
6 changed files with 288 additions and 49 deletions

View File

@ -29,8 +29,6 @@ var log = logging.Logger("advmgr")
var ErrNoWorkers = errors.New("no suitable workers found")
type URLs []string
type Worker interface {
storiface.WorkerCalls
@ -47,8 +45,6 @@ type Worker interface {
}
type SectorManager interface {
ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer
storage.Prover
storiface.WorkerReturn
@ -804,3 +800,4 @@ func (m *Manager) Close(ctx context.Context) error {
}
var _ SectorManager = &Manager{}
var _ Unsealer = &Manager{}

110
extern/sector-storage/piece_provider.go vendored Normal file
View File

@ -0,0 +1,110 @@
package sectorstorage
import (
"bufio"
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fr32"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type Unsealer interface {
SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error
}
type PieceProvider struct {
storage *stores.Remote
index stores.SectorIndex
uns Unsealer
}
func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unsealer) *PieceProvider {
return &PieceProvider{
storage: storage,
index: index,
uns: uns,
}
}
func (p *PieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded(), storiface.FTUnsealed)
if err != nil {
cancel()
return nil, nil, err
}
if r == nil {
cancel()
}
return r, cancel, nil
}
func (p *PieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
if err != nil {
return nil, false, err
}
var uns bool
if r == nil {
uns = true
commd := &unsealed
if unsealed == cid.Undef {
commd = nil
}
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil {
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
}
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
if err != nil {
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
}
if r == nil {
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
}
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127),
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
}
type funcCloser struct {
io.Reader
close func() error
}
func (fc *funcCloser) Close() error { return fc.close() }

View File

@ -3,6 +3,7 @@ package stores
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/bits"
@ -15,6 +16,7 @@ import (
"sort"
"sync"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
@ -293,6 +295,148 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
}
}
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return false, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth.Clone()
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint
switch resp.StatusCode {
case http.StatusOK:
return true, nil
case http.StatusRequestedRangeNotSatisfiable:
return false, nil
default:
return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode)
}
}
func (r *Remote) readRemote(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
if len(r.limit) >= cap(r.limit) {
log.Infof("Throttling remote read, %d already running", len(r.limit))
}
// TODO: Smarter throttling
// * Priority (just going sequentially is still pretty good)
// * Per interface
// * Aware of remote load
select {
case r.limit <- struct{}{}:
defer func() { <-r.limit }()
case <-ctx.Done():
return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth.Clone()
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, xerrors.Errorf("do request: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close() // nolint
return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
return resp.Body, nil
}
// Reated gets a reader for unsealed file range. Can return nil in case the requested range isn't allocated in the file
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize, ft storiface.SectorFileType) (io.ReadCloser, error) {
if ft != storiface.FTUnsealed {
return nil, xerrors.Errorf("reader only supports unsealed files")
}
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire local: %w", err)
}
path := storiface.PathByType(paths, ft)
var rd io.ReadCloser
if path == "" {
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
iloop:
for _, info := range si {
for _, url := range info.URLs {
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}
rd, err = r.readRemote(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
break iloop
}
}
} else {
log.Infof("Read local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return nil, err
}
pf, err := ffiwrapper.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return nil, xerrors.Errorf("has allocated: %w", err)
}
if !has {
if err := pf.Close(); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
return nil, nil
}
return pf.Reader(storiface.PaddedByteIndex(offset), size)
}
// note: rd can be nil
return rd, nil
}
func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
// Make sure we have the data local
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)

View File

@ -5,6 +5,9 @@ import (
"io"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
@ -13,7 +16,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -25,15 +27,16 @@ import (
var log = logging.Logger("retrievaladapter")
type retrievalProviderNode struct {
miner *storage.Miner
sealer sectorstorage.SectorManager
full v1api.FullNode
maddr address.Address
secb sectorblocks.SectorBuilder
pp *sectorstorage.PieceProvider
full v1api.FullNode
}
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, sealer sectorstorage.SectorManager, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sealer, full}
func NewRetrievalProviderNode(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp *sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{address.Address(maddr), secb, pp, full}
}
func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
@ -47,14 +50,12 @@ func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, min
}
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
si, err := rpn.miner.GetSectorInfo(sectorID)
si, err := rpn.secb.SectorsStatus(ctx, sectorID, false)
if err != nil {
return nil, err
}
mid, err := address.IDFromAddress(rpn.miner.Address())
mid, err := address.IDFromAddress(rpn.maddr)
if err != nil {
return nil, err
}
@ -64,27 +65,20 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi
Miner: abi.ActorID(mid),
Number: sectorID,
},
ProofType: si.SectorType,
ProofType: si.SealProof,
}
// Set up a pipe so that data can be written from the unsealing process
// into the reader returned by this function
r, w := io.Pipe()
go func() {
var commD cid.Cid
if si.CommD != nil {
commD = *si.CommD
}
var commD cid.Cid
if si.CommD != nil {
commD = *si.CommD
}
// Read the piece into the pipe's writer, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
err := rpn.sealer.ReadPiece(ctx, w, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
if err != nil {
log.Errorf("failed to unseal piece from sector %d: %s", sectorID, err)
}
// Close the reader with any error that was returned while reading the piece
_ = w.CloseWithError(err)
}()
// Read the piece into the pipe's writer, unsealing the piece if necessary
r, unsealed, err := rpn.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.Ticket.Value, commD)
if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
}
_ = unsealed // todo: use
return r, nil
}

View File

@ -490,10 +490,10 @@ func ConfigCommon(cfg *config.Common) Option {
Override(SetApiEndpointKey, func(lr repo.LockedRepo, e dtypes.APIEndpoint) error {
return lr.SetAPIEndpoint(e)
}),
Override(new(sectorstorage.URLs), func(e dtypes.APIEndpoint) (sectorstorage.URLs, error) {
Override(new(stores.URLs), func(e dtypes.APIEndpoint) (stores.URLs, error) {
ip := cfg.API.RemoteListenAddress
var urls sectorstorage.URLs
var urls stores.URLs
urls = append(urls, "http://"+ip+"/remote") // TODO: This makes no assumptions, and probably could...
return urls, nil
}),

View File

@ -67,7 +67,6 @@ import (
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
lotusminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -632,11 +631,15 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt
}
}
func RetrievalNetwork(h host.Host) rmnet.RetrievalMarketNetwork {
return rmnet.NewFromLibp2pHost(h)
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host,
miner *storage.Miner,
sealer sectorstorage.SectorManager,
full v1api.FullNode,
func RetrievalProvider(
maddr dtypes.MinerAddress,
adapter retrievalmarket.RetrievalProviderNode,
netwk rmnet.RetrievalMarketNetwork,
ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore,
@ -645,17 +648,8 @@ func RetrievalProvider(h host.Host,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc,
userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
netwk := rmnet.NewFromLibp2pHost(h)
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
return retrievalimpl.NewProvider(address.Address(maddr), adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
}
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")