deals: Sealing handler
This commit is contained in:
parent
2194f52852
commit
5ab1b1caaf
@ -29,6 +29,8 @@ type MinerDeal struct {
|
||||
|
||||
Ref cid.Cid
|
||||
|
||||
SectorID uint64 // Set when State >= Staged
|
||||
|
||||
s inet.Stream
|
||||
}
|
||||
|
||||
@ -55,6 +57,7 @@ type dealUpdate struct {
|
||||
newState DealState
|
||||
id cid.Cid
|
||||
err error
|
||||
mut func(*MinerDeal)
|
||||
}
|
||||
|
||||
func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) {
|
||||
@ -136,6 +139,9 @@ func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) {
|
||||
var deal MinerDeal
|
||||
err := h.deals.MutateMiner(update.id, func(d *MinerDeal) error {
|
||||
d.State = update.newState
|
||||
if update.mut != nil {
|
||||
update.mut(d)
|
||||
}
|
||||
deal = *d
|
||||
return nil
|
||||
})
|
||||
@ -150,7 +156,7 @@ func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) {
|
||||
case Staged:
|
||||
h.handle(ctx, deal, h.staged, Sealing)
|
||||
case Sealing:
|
||||
log.Error("TODO")
|
||||
h.handle(ctx, deal, h.sealing, Complete)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,9 @@ package deals
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
@ -9,16 +12,17 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type handlerFunc func(ctx context.Context, deal MinerDeal) error
|
||||
type handlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
|
||||
|
||||
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) {
|
||||
go func() {
|
||||
err := cb(ctx, deal)
|
||||
mut, err := cb(ctx, deal)
|
||||
select {
|
||||
case h.updated <- dealUpdate{
|
||||
newState: next,
|
||||
id: deal.ProposalCid,
|
||||
err: err,
|
||||
mut: mut,
|
||||
}:
|
||||
case <-h.stop:
|
||||
}
|
||||
@ -27,14 +31,13 @@ func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, ne
|
||||
|
||||
// ACCEPTED
|
||||
|
||||
func (h *Handler) accept(ctx context.Context, deal MinerDeal) error {
|
||||
log.Info("acc")
|
||||
func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||
switch deal.Proposal.SerializationMode {
|
||||
//case SerializationRaw:
|
||||
//case SerializationIPLD:
|
||||
case SerializationUnixFs:
|
||||
default:
|
||||
return xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode)
|
||||
return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode)
|
||||
}
|
||||
|
||||
// TODO: check payment
|
||||
@ -46,18 +49,17 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) error {
|
||||
Proposal: deal.ProposalCid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return merkledag.FetchGraph(ctx, deal.Ref, h.dag)
|
||||
return nil, merkledag.FetchGraph(ctx, deal.Ref, h.dag)
|
||||
}
|
||||
|
||||
// STAGED
|
||||
|
||||
func (h *Handler) staged(ctx context.Context, deal MinerDeal) error {
|
||||
func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||
err := h.sendSignedResponse(StorageDealResponse{
|
||||
State: Staged,
|
||||
Message: "",
|
||||
Proposal: deal.ProposalCid,
|
||||
})
|
||||
if err != nil {
|
||||
@ -66,24 +68,24 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) error {
|
||||
|
||||
root, err := h.dag.Get(ctx, deal.Ref)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get file root for deal: %s", err)
|
||||
return nil, xerrors.Errorf("failed to get file root for deal: %s", err)
|
||||
}
|
||||
|
||||
// TODO: abstract this away into ReadSizeCloser + implement different modes
|
||||
n, err := unixfile.NewUnixfsFile(ctx, h.dag, root)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("cannot open unixfs file: %s", err)
|
||||
return nil, xerrors.Errorf("cannot open unixfs file: %s", err)
|
||||
}
|
||||
|
||||
uf, ok := n.(files.File)
|
||||
if !ok {
|
||||
// we probably got directory, unsupported for now
|
||||
return xerrors.Errorf("unsupported unixfs type")
|
||||
return nil, xerrors.Errorf("unsupported unixfs type")
|
||||
}
|
||||
|
||||
size, err := uf.Size()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get file size: %s", err)
|
||||
return nil, xerrors.Errorf("failed to get file size: %s", err)
|
||||
}
|
||||
|
||||
var sectorID uint64
|
||||
@ -92,11 +94,77 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) error {
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("AddPiece failed: %s", err)
|
||||
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
||||
}
|
||||
|
||||
log.Warnf("New Sector: %d", sectorID)
|
||||
return nil
|
||||
return func(deal *MinerDeal) {
|
||||
deal.SectorID = sectorID
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SEALING
|
||||
|
||||
func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (PieceInclusionProof, error) {
|
||||
for i, p := range status.Pieces {
|
||||
if p.Key == ref {
|
||||
return PieceInclusionProof{
|
||||
Position: uint64(i),
|
||||
ProofElements: p.InclusionProof,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
|
||||
}
|
||||
|
||||
func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error){
|
||||
loop:
|
||||
for {
|
||||
status, err = h.sb.SealStatus(deal.SectorID)
|
||||
if err != nil {
|
||||
return sectorbuilder.SectorSealingStatus{}, err
|
||||
}
|
||||
|
||||
switch status.SealStatusCode {
|
||||
case 0: // sealed
|
||||
break loop
|
||||
case 2: // failed
|
||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
|
||||
case 1: // pending
|
||||
if err := h.sb.SealAllStagedSectors(); err != nil {
|
||||
return sectorbuilder.SectorSealingStatus{}, err
|
||||
}
|
||||
// start seal
|
||||
fallthrough
|
||||
case 3: // sealing
|
||||
// wait
|
||||
default:
|
||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||
status, err := h.pollSectorSealed(deal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ip, err := getInclusionProof(deal.Ref.String(), status)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = h.sendSignedResponse(StorageDealResponse{
|
||||
State: Sealing,
|
||||
Proposal: deal.ProposalCid,
|
||||
PieceInclusionProof: ip,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warnf("Sending deal response failed: %s", err)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ type SignedStorageDealProposal struct {
|
||||
|
||||
type PieceInclusionProof struct {
|
||||
Position uint64
|
||||
ProofElements [32]byte
|
||||
ProofElements []byte
|
||||
}
|
||||
|
||||
type StorageDealResponse struct {
|
||||
|
@ -96,6 +96,8 @@ func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSealingStatus) error {
|
||||
log.Info("committing sector")
|
||||
|
||||
ok, err := sectorbuilder.VerifySeal(1024, sinfo.CommR[:], sinfo.CommD[:], sinfo.CommRStar[:], m.maddr, sinfo.SectorID, sinfo.Proof)
|
||||
if err != nil {
|
||||
log.Error("failed to verify seal we just created: ", err)
|
||||
|
Loading…
Reference in New Issue
Block a user