fix rspco serialization

This commit is contained in:
Łukasz Magiera 2019-11-22 16:48:02 +01:00
parent b726b95298
commit 3281e9448a
3 changed files with 50 additions and 20 deletions

View File

@ -84,7 +84,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
}
if err := w.fetchSector(task.SectorID, task.Type); err != nil {
return errRes(err)
return errRes(xerrors.Errorf("fetching sector: %w", err))
}
var res sectorbuilder.SealRes
@ -93,19 +93,19 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
case sectorbuilder.WorkerPreCommit:
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces)
if err != nil {
return errRes(err)
return errRes(xerrors.Errorf("precomitting: %w", err))
}
res.Rspco = rspco
res.Rspco = rspco.ToJson()
// TODO: push cache
if err := w.push("sealed", task.SectorID); err != nil {
return errRes(err)
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
case sectorbuilder.WorkerCommit:
proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, nil, task.Rspco)
if err != nil {
return errRes(err)
return errRes(xerrors.Errorf("comitting: %w", err))
}
res.Proof = proof
@ -191,10 +191,6 @@ func (w *worker) push(typ string, sectorID uint64) error {
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
}
if err := f.Close(); err != nil {
return err
}
return resp.Body.Close()
}

View File

@ -43,7 +43,7 @@ type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput
type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput
const CommLen = sectorbuilder.CommitmentBytesLen
@ -74,11 +74,36 @@ type SectorBuilder struct {
stopping chan struct{}
}
type SealRes struct {
Err error `json:"omitempty"`
type JsonRSPCO struct {
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
}
Proof []byte `json:"omitempty"`
Rspco RawSealPreCommitOutput `json:"omitempty"`
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{
CommC: rspco.CommC[:],
CommD: rspco.CommD[:],
CommR: rspco.CommR[:],
CommRLast: rspco.CommRLast[:],
}
}
func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
var out RawSealPreCommitOutput
copy(out.CommC[:], rspco.CommC)
copy(out.CommD[:], rspco.CommD)
copy(out.CommR[:], rspco.CommR)
copy(out.CommRLast[:], rspco.CommRLast)
return out
}
type SealRes struct {
Err error
Proof []byte
Rspco JsonRSPCO
}
type remote struct {
@ -310,7 +335,7 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
select {
case ret := <-call.ret:
return ret.Rspco, ret.Err
return ret.Rspco.rspco(), ret.Err
case <-sb.stopping:
return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped")
}
@ -350,12 +375,12 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
}
var sum uint64
@ -384,7 +409,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
return rspco, nil
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
@ -415,9 +440,12 @@ func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, see
ticket.TicketBytes,
seed.TicketBytes,
pieces,
rspco,
sectorbuilder.RawSealPreCommitOutput(rspco),
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
@ -456,6 +484,10 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
return nil, xerrors.Errorf("commit: %w", err)
}
if pieceKeys == nil {
return
}
pmeta := make([]sectorbuilder.PieceMetadata, len(pieces))
for i, piece := range pieces {
pmeta[i] = sectorbuilder.PieceMetadata{

View File

@ -79,7 +79,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
defer fr.Close()
w.WriteHeader(200)
n, err := io.Copy(w, fr)
n, err := io.Copy(fr, r.Body)
if err != nil {
log.Error(err)
return
@ -162,6 +162,8 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde
}
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
log.Infof("WDUN RSPKO %v", res.Rspco)
return sm.SectorBuilder.TaskDone(ctx, task, res)
}