diff --git a/go.mod b/go.mod index 1f4aab4fd..aed796cc7 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 - github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 + github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15 github.com/filecoin-project/specs-actors v0.7.1 github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea github.com/ipfs/go-cid v0.0.5 diff --git a/go.sum b/go.sum index 0507875d4..86e5cf1bc 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060 h1:/3 github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw= github.com/filecoin-project/go-bitfield v0.0.1 h1:Xg/JnrqqE77aJVKdbEyR04n9FZQWhwrN+buDgQCVpZU= github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= -github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60= -github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= +github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1 h1:xuHlrdznafh7ul5t4xEncnA4qgpQvJZEw+mr98eqHXw= +github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= @@ -55,13 +55,14 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= -github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg= -github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY= +github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15 h1:miw6hiusb/MkV1ryoqUKKWnvHhPW00AYtyeCj0L8pqo= +github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo= github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA= github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0= github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0= github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= +github.com/filecoin-project/specs-actors v0.6.1/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= github.com/filecoin-project/specs-actors v0.7.1 h1:/zW++MN4gGIPvG+s0zmSI97k0Z/aaeiREjLC10gQbco= github.com/filecoin-project/specs-actors v0.7.1/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws= github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY= diff --git a/sealing.go b/sealing.go index f5f3fb480..ce0506560 100644 --- a/sealing.go +++ b/sealing.go @@ -55,8 +55,8 @@ type Sealing struct { sc SectorIDCounter verif ffiwrapper.Verifier - unsealedInfos map[abi.SectorNumber]UnsealedSectorInfo - pcp PreCommitPolicy + pcp PreCommitPolicy + unsealedInfoMap UnsealedSectorMap upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} @@ -64,8 +64,13 @@ type Sealing struct { getSealDelay GetSealingDelayFunc } +type UnsealedSectorMap struct { + infos map[abi.SectorNumber]UnsealedSectorInfo + mux sync.Mutex +} + type UnsealedSectorInfo struct { - // stored should always equal sum of pieceSizes + // stored should always equal sum of pieceSizes.Padded() stored uint64 pieceSizes []abi.UnpaddedPieceSize } @@ -75,12 +80,15 @@ func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batc api: api, events: events, - maddr: maddr, - sealer: sealer, - sc: sc, - verif: verif, - unsealedInfos: make(map[abi.SectorNumber]UnsealedSectorInfo), - pcp: pcp, + maddr: maddr, + sealer: sealer, + sc: sc, + verif: verif, + pcp: pcp, + unsealedInfoMap: UnsealedSectorMap{ + infos: make(map[abi.SectorNumber]UnsealedSectorInfo), + mux: sync.Mutex{}, + }, toUpgrade: map[abi.SectorNumber]struct{}{}, getSealDelay: gsd, @@ -113,21 +121,23 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return 0, 0, xerrors.Errorf("piece cannot fit into a sector") } - sid, err := m.getAvailableSector(size) + m.unsealedInfoMap.mux.Lock() + defer m.unsealedInfoMap.mux.Unlock() + + sid, pads, err := m.getSectorAndPadding(size) if err != nil { - return 0, 0, xerrors.Errorf("creating new sector: %w", err) + return 0, 0, xerrors.Errorf("getting available sector: %w", err) } - offset := m.unsealedInfos[sid].stored - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sid), m.unsealedInfos[sid].pieceSizes, size, r) - if err != nil { - return 0, 0, xerrors.Errorf("writing piece: %w", err) + for _, p := range pads { + err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil) + if err != nil { + return 0, 0, xerrors.Errorf("writing pads: %w", err) + } } - err = m.addPiece(sid, Piece{ - Piece: ppi, - DealInfo: &d, - }) + offset := m.unsealedInfoMap.infos[sid].stored + err = m.addPiece(ctx, sid, size, r, &d) if err != nil { return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) @@ -136,16 +146,26 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return sid, offset, nil } -func (m *Sealing) addPiece(sectorID abi.SectorNumber, piece Piece) error { +// Caller should hold m.unsealedInfoMap.mux +func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { log.Infof("Adding piece to sector %d", sectorID) - err := m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece}) + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) + if err != nil { + return xerrors.Errorf("writing piece: %w", err) + } + piece := Piece{ + Piece: ppi, + DealInfo: di, + } + + err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece}) if err != nil { return err } - ui := m.unsealedInfos[sectorID] - m.unsealedInfos[sectorID] = UnsealedSectorInfo{ - stored: ui.stored + uint64(piece.Piece.Size.Unpadded()), + ui := m.unsealedInfoMap.infos[sectorID] + m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{ + stored: ui.stored + uint64(piece.Piece.Size), pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()), } @@ -156,6 +176,7 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } +// Caller should NOT hold m.unsealedInfoMap.mux func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { log.Infof("Starting packing sector %d", sectorID) err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) @@ -163,23 +184,34 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { return err } - delete(m.unsealedInfos, sectorID) + m.unsealedInfoMap.mux.Lock() + delete(m.unsealedInfoMap.infos, sectorID) + m.unsealedInfoMap.mux.Unlock() return nil } -func (m *Sealing) getAvailableSector(size abi.UnpaddedPieceSize) (abi.SectorNumber, error) { +// Caller should hold m.unsealedInfoMap.mux +func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { ss := m.sealer.SectorSize() - for k, v := range m.unsealedInfos { - if v.stored+uint64(size) <= uint64(ss) { - // TODO: Support multiple deal sizes in the same sector - if len(v.pieceSizes) == 0 || v.pieceSizes[0] == size { - return k, nil - } + for k, v := range m.unsealedInfoMap.infos { + pads, padLength := ffiwrapper.GetRequiredPadding(abi.PaddedPieceSize(v.stored), size.Padded()) + if v.stored+uint64(size)+uint64(padLength) <= uint64(ss) { + return k, pads, nil } } - return m.newSector() + ns, err := m.newSector() + if err != nil { + return 0, nil, err + } + + m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{ + stored: 0, + pieceSizes: nil, + } + + return ns, nil, nil } // newSector creates a new sector for deal storage @@ -222,11 +254,6 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) { }() } - m.unsealedInfos[sid] = UnsealedSectorInfo{ - stored: 0, - pieceSizes: nil, - } - return sid, nil }