diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index db7c70091..e8be9085c 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -84,7 +84,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) } if err := w.fetchSector(task.SectorID, task.Type); err != nil { - return errRes(err) + return errRes(xerrors.Errorf("fetching sector: %w", err)) } var res sectorbuilder.SealRes @@ -93,19 +93,19 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) case sectorbuilder.WorkerPreCommit: rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces) if err != nil { - return errRes(err) + return errRes(xerrors.Errorf("precomitting: %w", err)) } - res.Rspco = rspco + res.Rspco = rspco.ToJson() // TODO: push cache if err := w.push("sealed", task.SectorID); err != nil { - return errRes(err) + return errRes(xerrors.Errorf("pushing precommited data: %w", err)) } case sectorbuilder.WorkerCommit: proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, nil, task.Rspco) if err != nil { - return errRes(err) + return errRes(xerrors.Errorf("comitting: %w", err)) } res.Proof = proof @@ -191,10 +191,6 @@ func (w *worker) push(typ string, sectorID uint64) error { return xerrors.Errorf("non-200 response: %d", resp.StatusCode) } - if err := f.Close(); err != nil { - return err - } - return resp.Body.Close() } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index f017635ad..941728e0a 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -43,7 +43,7 @@ type SealCommitOutput = sectorbuilder.SealCommitOutput type PublicPieceInfo = sectorbuilder.PublicPieceInfo -type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput +type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput const CommLen = sectorbuilder.CommitmentBytesLen @@ -74,11 +74,36 @@ type SectorBuilder struct { stopping chan struct{} } -type SealRes struct { - Err error `json:"omitempty"` +type JsonRSPCO struct { + CommC []byte + CommD []byte + CommR []byte + CommRLast []byte +} - Proof []byte `json:"omitempty"` - Rspco RawSealPreCommitOutput `json:"omitempty"` +func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO { + return JsonRSPCO{ + CommC: rspco.CommC[:], + CommD: rspco.CommD[:], + CommR: rspco.CommR[:], + CommRLast: rspco.CommRLast[:], + } +} + +func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { + var out RawSealPreCommitOutput + copy(out.CommC[:], rspco.CommC) + copy(out.CommD[:], rspco.CommD) + copy(out.CommR[:], rspco.CommR) + copy(out.CommRLast[:], rspco.CommRLast) + return out +} + +type SealRes struct { + Err error + + Proof []byte + Rspco JsonRSPCO } type remote struct { @@ -310,7 +335,7 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { select { case ret := <-call.ret: - return ret.Rspco, ret.Err + return ret.Rspco.rspco(), ret.Err case <-sb.stopping: return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped") } @@ -350,12 +375,12 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { - return RawSealPreCommitOutput{}, err + return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err) } sealedPath, err := sb.SealedSectorPath(sectorID) if err != nil { - return RawSealPreCommitOutput{}, err + return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err) } var sum uint64 @@ -384,7 +409,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err) } - return rspco, nil + return RawSealPreCommitOutput(rspco), nil } func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { @@ -415,9 +440,12 @@ func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, see ticket.TicketBytes, seed.TicketBytes, pieces, - rspco, + sectorbuilder.RawSealPreCommitOutput(rspco), ) if err != nil { + log.Warn("StandaloneSealCommit error: ", err) + log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco) + return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) } @@ -456,6 +484,10 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea return nil, xerrors.Errorf("commit: %w", err) } + if pieceKeys == nil { + return + } + pmeta := make([]sectorbuilder.PieceMetadata, len(pieces)) for i, piece := range pieces { pmeta[i] = sectorbuilder.PieceMetadata{ diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b1b4f9f82..083097827 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -79,7 +79,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques defer fr.Close() w.WriteHeader(200) - n, err := io.Copy(w, fr) + n, err := io.Copy(fr, r.Body) if err != nil { log.Error(err) return @@ -162,6 +162,8 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde } func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { + log.Infof("WDUN RSPKO %v", res.Rspco) + return sm.SectorBuilder.TaskDone(ctx, task, res) }