WIP remote sector CommitSseal

This commit is contained in:
Łukasz Magiera 2019-11-21 20:51:48 +01:00
parent d4197bbadc
commit b726b95298
5 changed files with 75 additions and 7 deletions

View File

@ -97,11 +97,20 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
}
res.Rspco = rspco
// TODO: push cache
if err := w.push("sealed", task.SectorID); err != nil {
return errRes(err)
}
case sectorbuilder.WorkerCommit:
panic("todo")
proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, nil, task.Rspco)
if err != nil {
return errRes(err)
}
res.Proof = proof
// TODO: Push cache
}
return res
@ -167,7 +176,7 @@ func (w *worker) push(typ string, sectorID uint64) error {
bar.Start()
defer bar.Finish()
//todo set content size
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f))
if err != nil {
return err
@ -195,7 +204,8 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType)
case sectorbuilder.WorkerPreCommit:
err = w.fetch("staged", sectorID)
case sectorbuilder.WorkerCommit:
panic("todo")
err = w.fetch("sealed", sectorID)
// todo: cache
}
if err != nil {
return xerrors.Errorf("fetch failed: %w", err)

View File

@ -48,6 +48,8 @@ func (sb *SectorBuilder) OpenRemoteRead(typ string, sectorName string) (*os.File
switch typ {
case "staged":
return os.OpenFile(filepath.Join(sb.stagedDir, sectorName), os.O_RDONLY, 0644)
case "sealed":
return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_RDONLY, 0644)
default:
return nil, xerrors.Errorf("unknown sector type for read: %s", typ)
}

View File

@ -22,6 +22,10 @@ type WorkerTask struct {
// preCommit
SealTicket SealTicket
Pieces []PublicPieceInfo
// commit
SealSeed SealSeed
Rspco RawSealPreCommitOutput
}
type workerCall struct {

View File

@ -387,9 +387,19 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return rspco, nil
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) {
ret := sb.RateLimit()
defer ret()
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
select {
case ret := <-call.ret:
return ret.Proof, ret.Err
case <-sb.stopping:
return nil, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
@ -411,6 +421,41 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) {
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
select { // prefer remote
case sb.sealTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
select { // use whichever is available
case sb.sealTasks <- call:
proof, err = sb.sealCommitRemote(call)
case sb.rateLimit <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
pmeta := make([]sectorbuilder.PieceMetadata, len(pieces))
for i, piece := range pieces {
pmeta[i] = sectorbuilder.PieceMetadata{
@ -425,6 +470,11 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
return nil, err
}
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
err = sectorbuilder.ImportSealedSector(
sb.handle,
sectorID,

View File

@ -79,10 +79,12 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
defer fr.Close()
w.WriteHeader(200)
if _, err := io.Copy(w, fr); err != nil {
n, err := io.Copy(w, fr)
if err != nil {
log.Error(err)
return
}
log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], n)
}
func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) {