diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index bd06bd188..235a8e040 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -8,7 +8,6 @@ import ( abi "github.com/filecoin-project/go-state-types/abi" miner "github.com/filecoin-project/specs-actors/actors/builtin/miner" - cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) @@ -476,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{184, 27}); err != nil { + if _, err := w.Write([]byte{184, 26}); err != nil { return err } @@ -590,31 +589,6 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.PendingPieces ([]cid.Cid) (slice) - if len("PendingPieces") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"PendingPieces\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingPieces"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("PendingPieces")); err != nil { - return err - } - - if len(t.PendingPieces) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.PendingPieces was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.PendingPieces))); err != nil { - return err - } - for _, v := range t.PendingPieces { - if err := cbg.WriteCidBuf(scratch, w, v); err != nil { - return xerrors.Errorf("failed writing cid field t.PendingPieces: %w", err) - } - } - // t.TicketValue (abi.SealRandomness) (slice) if len("TicketValue") > cbg.MaxLength { return xerrors.Errorf("Value in field \"TicketValue\" was too long") @@ -1211,35 +1185,6 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Pieces[i] = v } - // t.PendingPieces ([]cid.Cid) (slice) - case "PendingPieces": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - - if extra > cbg.MaxLength { - return fmt.Errorf("t.PendingPieces: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.PendingPieces = make([]cid.Cid, extra) - } - - for i := 0; i < int(extra); i++ { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("reading cid field t.PendingPieces failed: %w", err) - } - t.PendingPieces[i] = c - } - // t.TicketValue (abi.SealRandomness) (slice) case "TicketValue": diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index b74154f07..98cf18308 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -79,15 +79,12 @@ func (evt SectorStartCC) apply(state *SectorInfo) { state.SectorType = evt.SectorType } -type SectorAddPiece struct { - NewPiece cid.Cid -} +type SectorAddPiece struct{} func (evt SectorAddPiece) apply(state *SectorInfo) { if state.CreationTime == 0 { state.CreationTime = time.Now().Unix() } - state.PendingPieces = append(state.PendingPieces, evt.NewPiece) } type SectorPieceAdded struct { @@ -96,7 +93,6 @@ type SectorPieceAdded struct { func (evt SectorPieceAdded) apply(state *SectorInfo) { state.Pieces = append(state.Pieces, evt.NewPieces...) - state.PendingPieces = nil } type SectorStartPacking struct{} diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index f9325c688..5fa2043b1 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -76,7 +76,10 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e // todo check deal expiration - return ctx.Send(SectorAddPiece{cid}) + sid := m.minerSectorID(sector.SectorNumber) + m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) + + return ctx.Send(SectorAddPiece{}) }, } @@ -96,12 +99,20 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return err } - m.inputLk.Lock() - delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) // todo: do this when handling the event - m.inputLk.Unlock() - res := SectorPieceAdded{} + m.inputLk.Lock() + + pending, ok := m.assignedPieces[m.minerSectorID(sector.SectorNumber)] + if ok { + delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber)) + } + m.inputLk.Unlock() + if !ok { + // nothing to do here + return ctx.Send(res) + } + var offset abi.UnpaddedPieceSize pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) for i, p := range sector.Pieces { @@ -114,13 +125,11 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("getting per-sector deal limit: %w", err) } - for i, piece := range sector.PendingPieces { + for i, piece := range pending { m.inputLk.Lock() deal, ok := m.pendingPieces[piece] m.inputLk.Unlock() if !ok { - // todo: this probably means that the miner process was restarted in the middle of adding pieces. - // Truncate whatever was in process of being added to the sector (keep sector.Pieces as those are cleanly added, then go to WaitDeals) return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) } @@ -145,7 +154,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er p.Unpadded(), NewNullReader(p.Unpadded())) if err != nil { - return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + err = xerrors.Errorf("writing padding piece: %w", err) + deal.accepted(sector.SectorNumber, offset, err) + return err // todo failed state } pieceSizes = append(pieceSizes, p.Unpadded()) @@ -160,7 +171,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er deal.size, deal.data) if err != nil { - return xerrors.Errorf("writing padding piece: %w", err) // todo failed state + err = xerrors.Errorf("writing piece: %w", err) + deal.accepted(sector.SectorNumber, offset, err) + return err // todo failed state } deal.accepted(sector.SectorNumber, offset, nil) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 995c6ceec..aa1a73e6c 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -84,10 +84,11 @@ type Sealing struct { verif ffiwrapper.Verifier pcp PreCommitPolicy - inputLk sync.Mutex - openSectors map[abi.SectorID]*openSector - sectorTimers map[abi.SectorID]*time.Timer - pendingPieces map[cid.Cid]*pendingPiece + inputLk sync.Mutex + openSectors map[abi.SectorID]*openSector + sectorTimers map[abi.SectorID]*time.Timer + pendingPieces map[cid.Cid]*pendingPiece + assignedPieces map[abi.SectorID][]cid.Cid upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} @@ -111,7 +112,7 @@ type FeeConfig struct { type openSector struct { used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors - maybeAccept func(cid.Cid) error + maybeAccept func(cid.Cid) error // called with inputLk } type pendingPiece struct { @@ -136,10 +137,11 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds verif: verif, pcp: pcp, - openSectors: map[abi.SectorID]*openSector{}, - sectorTimers: map[abi.SectorID]*time.Timer{}, - pendingPieces: map[cid.Cid]*pendingPiece{}, - toUpgrade: map[abi.SectorNumber]struct{}{}, + openSectors: map[abi.SectorID]*openSector{}, + sectorTimers: map[abi.SectorID]*time.Timer{}, + pendingPieces: map[cid.Cid]*pendingPiece{}, + assignedPieces: map[abi.SectorID][]cid.Cid{}, + toUpgrade: map[abi.SectorNumber]struct{}{}, notifee: notifee, addrSel: as, diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index d9c54479e..b6cc2afdb 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -70,9 +70,8 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing - CreationTime int64 // unix seconds - Pieces []Piece - PendingPieces []cid.Cid + CreationTime int64 // unix seconds + Pieces []Piece // PreCommit1 TicketValue abi.SealRandomness