docs, logs and green ci

This commit is contained in:
aarshkshah1992 2021-05-18 17:05:25 +05:30 committed by Dirk McCormick
parent 2a40c802ea
commit 73613ee883
9 changed files with 116 additions and 23 deletions

View File

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strconv"
@ -453,14 +454,23 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), sectorstorage.SealerConfig{
si := stores.NewIndex()
lstor, err := stores.NewLocal(ctx, lr, si, nil)
if err != nil {
return err
}
stor := stores.NewRemote(lstor, si, http.Header(sa), 10)
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
AllowCommit: true,
AllowUnseal: true,
}, nil, sa, wsts, smsts)
}, wsts, smsts)
if err != nil {
return err
}

View File

@ -103,19 +103,13 @@ type StorageAuth http.Header
type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil {
return nil, err
}
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit)
m := &Manager{
ls: ls,
storage: stor,
@ -204,6 +198,10 @@ func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileTy
}
}
// SectorsUnsealPiece will Unseal the Sealed sector file for the given sector.
// It will schedule the Unsealing task on a worker that either already has the sealed sector files or has space in
// one of it's sealing scratch spaces to store them after fetching them from another worker.
// If the chosen worker already has the Unsealed sector file, we will NOT Unseal the sealed sector file again.
func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -213,6 +211,8 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
}
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
// put it in the sealing scratch space.
sealFetch := func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
@ -231,6 +231,8 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
return xerrors.Errorf("getting sector size: %w", err)
}
// selector will schedule the Unseal task on a worker that either already has the sealed sector files or has space in
// one of it's sealing scratch spaces to store them after fetching them from another worker.
selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true)
log.Debugf("schedule unseal for sector %d", sector.ID)
@ -241,12 +243,14 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorR
// unseal the sector partially. Requesting the whole sector here can
// save us some work in case another piece is requested from here
log.Debugf("unseal sector %d", sector.ID)
// Note: This unsealed call will essentially become a no-op of the worker already has an Unsealed sector file for the given sector.
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, *unsealed))
log.Debugf("completed unseal sector %d", sector.ID)
return err
})
if err != nil {
return err
return xerrors.Errorf("worker UnsealPiece call: %s", err)
}
return nil

View File

@ -6,6 +6,7 @@ import (
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sync"
@ -372,13 +373,12 @@ func generateFakePoSt(sectorInfo []proof2.SectorInfo, rpt func(abi.RegisteredSea
}
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if offset != 0 {
panic("implme")
}
_, err := io.CopyN(w, bytes.NewReader(mgr.pieces[mgr.sectors[sectorID.ID].pieces[0]]), int64(size))
return err
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil
}
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
@ -489,6 +489,10 @@ func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID,
panic("not supported")
}
func (mgr *SectorMgr) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error {
return nil
}
func (m mockVerif) VerifySeal(svi proof2.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -17,10 +17,12 @@ import (
)
type Unsealer interface {
// SectorsUnsealPiece will Unseal a Sealed sector file for the given sector.
SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error
}
type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
}
@ -38,6 +40,10 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse
}
}
// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it.
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Will return a nil reader if the piece does NOT exist in any unsealed file/there is not unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
@ -58,6 +64,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
return r, cancel, nil
}
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
@ -68,6 +77,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
err = nil
}
if err != nil {
@ -85,6 +95,8 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
}
log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
if err != nil {
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
@ -92,6 +104,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
if r == nil {
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
}
log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
} else {
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size)
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
@ -99,6 +114,8 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127),
close: func() error {

View File

@ -59,6 +59,8 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
}
}
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)
@ -129,8 +131,11 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
}
} else {
w.Header().Set("Content-Type", "application/octet-stream")
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
http.ServeFile(w, r, path)
}
log.Debugf("served sector file/dir, sectorID=%+v, fileType=%s, path=%s", id, ft, path)
}
func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) {
@ -158,6 +163,9 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
}
}
// remoteGetAllocated returns `http.StatusOK` if the worker already has an Unsealed sector file
// containing the Unsealed piece sent in the request.
// returns `http.StatusRequestedRangeNotSatisfiable` otherwise.
func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE Alloc check %s", r.URL)
vars := mux.Vars(r)
@ -216,6 +224,8 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R
ProofType: 0,
}
// get the path of the local Unsealed file for the given sector.
// return error if we do NOT have it.
paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Errorf("%+v", err)
@ -230,6 +240,7 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R
return
}
// open the Unsealed file and check if it has the Unsealed sector for the piece at the given offset and size.
pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
log.Error("opening partial file: ", err)
@ -250,9 +261,12 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R
}
if has {
log.Debugf("returning ok: worker has unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d", id, offi, szi)
w.WriteHeader(http.StatusOK)
return
}
log.Debugf("returning StatusRequestedRangeNotSatisfiable: worker does NOT have unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d", id, offi, szi)
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
}

View File

@ -479,10 +479,19 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa
return resp.Body, nil
}
// Reader gets a reader for unsealed file range. Can return nil in case the requested range isn't allocated in the file
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy.
// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file
// to know if they have the unsealed piece and will then read the unsealed piece data from a worker that has it.
//
// Returns a nil reader if :
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
ft := storiface.FTUnsealed
// check if we have the unsealed sector file locally
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire local: %w", err)
@ -490,7 +499,11 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
path := storiface.PathByType(paths, ft)
var rd io.ReadCloser
if path == "" {
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
// to determine which workers have the unsealed file and then query those workers to know
// if they have the unsealed piece in the unsealed sector file.
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return nil, err
@ -500,7 +513,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more preferred to store ?
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ?
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
@ -508,6 +521,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
iloop:
for _, info := range si {
for _, url := range info.URLs {
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
// unsealed piece in their unsealed sector file.
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
@ -517,6 +532,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
continue
}
// readRemote fetches a reader that we can used to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err = r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
@ -527,17 +544,22 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
}
}
} else {
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
// unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return nil, err
}
// open the unsealed sector file for the given sector size located at the given path.
pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
// in the unsealed sector file. That is what `HasAllocated` checks for.
has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return nil, xerrors.Errorf("has allocated: %w", err)
@ -547,10 +569,10 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
if err := pf.Close(); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
return nil, nil
}
log.Debugf("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return pf.Reader(storiface.PaddedByteIndex(offset), size)
}

View File

@ -375,6 +375,8 @@ var MinerNode = Options(
Override(new(*stores.Index), stores.NewIndex),
Override(new(stores.SectorIndex), From(new(*stores.Index))),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
Override(new(*stores.Local), modules.LocalStorage),
Override(new(*stores.Remote), modules.RemoteStorage),
Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))),

View File

@ -635,7 +635,6 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host,
miner *storage.Miner,
sealer sectorstorage.SectorManager,
full v1api.FullNode,
ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore,
@ -660,13 +659,22 @@ func RetrievalProvider(h host.Host,
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls")
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, urls sectorstorage.URLs) (*stores.Local, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
return stores.NewLocal(ctx, ls, si, urls)
}
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote {
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit)
}
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix))
sst, err := sectorstorage.New(ctx, ls, si, sc, urls, sa, wsts, smsts)
sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, wsts, smsts)
if err != nil {
return nil, err
}

View File

@ -485,11 +485,16 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
}
fulls[i].Stb = storageBuilder(fulls[i], mn, node.Options(
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) {
return mock.NewMockSectorMgr(nil), nil
}),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Unset(new(*sectorstorage.Manager)),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
))
}
@ -523,11 +528,18 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
opts = node.Options()
}
storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options(
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) {
return mock.NewMockSectorMgr(sectors), nil
}),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Unset(new(*sectorstorage.Manager)),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
opts,
))