Merge pull request #44 from filecoin-project/asr/mutex
Packing deals improvements
This commit is contained in:
commit
1794862d73
2
go.mod
2
go.mod
@ -9,7 +9,7 @@ require (
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect
|
||||
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15
|
||||
github.com/filecoin-project/specs-actors v0.7.1
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
|
||||
github.com/ipfs/go-cid v0.0.5
|
||||
|
9
go.sum
9
go.sum
@ -38,8 +38,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060 h1:/3
|
||||
github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw=
|
||||
github.com/filecoin-project/go-bitfield v0.0.1 h1:Xg/JnrqqE77aJVKdbEyR04n9FZQWhwrN+buDgQCVpZU=
|
||||
github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
|
||||
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60=
|
||||
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
|
||||
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1 h1:xuHlrdznafh7ul5t4xEncnA4qgpQvJZEw+mr98eqHXw=
|
||||
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||
@ -55,13 +55,14 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h
|
||||
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
|
||||
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
|
||||
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg=
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY=
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15 h1:miw6hiusb/MkV1ryoqUKKWnvHhPW00AYtyeCj0L8pqo=
|
||||
github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo=
|
||||
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
|
||||
github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0=
|
||||
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
|
||||
github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0=
|
||||
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
||||
github.com/filecoin-project/specs-actors v0.6.1/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
||||
github.com/filecoin-project/specs-actors v0.7.1 h1:/zW++MN4gGIPvG+s0zmSI97k0Z/aaeiREjLC10gQbco=
|
||||
github.com/filecoin-project/specs-actors v0.7.1/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
|
||||
|
103
sealing.go
103
sealing.go
@ -55,8 +55,8 @@ type Sealing struct {
|
||||
sc SectorIDCounter
|
||||
verif ffiwrapper.Verifier
|
||||
|
||||
unsealedInfos map[abi.SectorNumber]UnsealedSectorInfo
|
||||
pcp PreCommitPolicy
|
||||
pcp PreCommitPolicy
|
||||
unsealedInfoMap UnsealedSectorMap
|
||||
|
||||
upgradeLk sync.Mutex
|
||||
toUpgrade map[abi.SectorNumber]struct{}
|
||||
@ -64,8 +64,13 @@ type Sealing struct {
|
||||
getSealDelay GetSealingDelayFunc
|
||||
}
|
||||
|
||||
type UnsealedSectorMap struct {
|
||||
infos map[abi.SectorNumber]UnsealedSectorInfo
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
type UnsealedSectorInfo struct {
|
||||
// stored should always equal sum of pieceSizes
|
||||
// stored should always equal sum of pieceSizes.Padded()
|
||||
stored uint64
|
||||
pieceSizes []abi.UnpaddedPieceSize
|
||||
}
|
||||
@ -75,12 +80,15 @@ func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batc
|
||||
api: api,
|
||||
events: events,
|
||||
|
||||
maddr: maddr,
|
||||
sealer: sealer,
|
||||
sc: sc,
|
||||
verif: verif,
|
||||
unsealedInfos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
||||
pcp: pcp,
|
||||
maddr: maddr,
|
||||
sealer: sealer,
|
||||
sc: sc,
|
||||
verif: verif,
|
||||
pcp: pcp,
|
||||
unsealedInfoMap: UnsealedSectorMap{
|
||||
infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
||||
mux: sync.Mutex{},
|
||||
},
|
||||
|
||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||
getSealDelay: gsd,
|
||||
@ -113,21 +121,23 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
||||
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
|
||||
}
|
||||
|
||||
sid, err := m.getAvailableSector(size)
|
||||
m.unsealedInfoMap.mux.Lock()
|
||||
defer m.unsealedInfoMap.mux.Unlock()
|
||||
|
||||
sid, pads, err := m.getSectorAndPadding(size)
|
||||
if err != nil {
|
||||
return 0, 0, xerrors.Errorf("creating new sector: %w", err)
|
||||
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
|
||||
}
|
||||
|
||||
offset := m.unsealedInfos[sid].stored
|
||||
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sid), m.unsealedInfos[sid].pieceSizes, size, r)
|
||||
if err != nil {
|
||||
return 0, 0, xerrors.Errorf("writing piece: %w", err)
|
||||
for _, p := range pads {
|
||||
err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil)
|
||||
if err != nil {
|
||||
return 0, 0, xerrors.Errorf("writing pads: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = m.addPiece(sid, Piece{
|
||||
Piece: ppi,
|
||||
DealInfo: &d,
|
||||
})
|
||||
offset := m.unsealedInfoMap.infos[sid].stored
|
||||
err = m.addPiece(ctx, sid, size, r, &d)
|
||||
|
||||
if err != nil {
|
||||
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
|
||||
@ -136,16 +146,26 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
||||
return sid, offset, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) addPiece(sectorID abi.SectorNumber, piece Piece) error {
|
||||
// Caller should hold m.unsealedInfoMap.mux
|
||||
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
|
||||
log.Infof("Adding piece to sector %d", sectorID)
|
||||
err := m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece})
|
||||
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing piece: %w", err)
|
||||
}
|
||||
piece := Piece{
|
||||
Piece: ppi,
|
||||
DealInfo: di,
|
||||
}
|
||||
|
||||
err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ui := m.unsealedInfos[sectorID]
|
||||
m.unsealedInfos[sectorID] = UnsealedSectorInfo{
|
||||
stored: ui.stored + uint64(piece.Piece.Size.Unpadded()),
|
||||
ui := m.unsealedInfoMap.infos[sectorID]
|
||||
m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{
|
||||
stored: ui.stored + uint64(piece.Piece.Size),
|
||||
pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()),
|
||||
}
|
||||
|
||||
@ -156,6 +176,7 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||
}
|
||||
|
||||
// Caller should NOT hold m.unsealedInfoMap.mux
|
||||
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
||||
log.Infof("Starting packing sector %d", sectorID)
|
||||
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
|
||||
@ -163,23 +184,34 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(m.unsealedInfos, sectorID)
|
||||
m.unsealedInfoMap.mux.Lock()
|
||||
delete(m.unsealedInfoMap.infos, sectorID)
|
||||
m.unsealedInfoMap.mux.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sealing) getAvailableSector(size abi.UnpaddedPieceSize) (abi.SectorNumber, error) {
|
||||
// Caller should hold m.unsealedInfoMap.mux
|
||||
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
|
||||
ss := m.sealer.SectorSize()
|
||||
for k, v := range m.unsealedInfos {
|
||||
if v.stored+uint64(size) <= uint64(ss) {
|
||||
// TODO: Support multiple deal sizes in the same sector
|
||||
if len(v.pieceSizes) == 0 || v.pieceSizes[0] == size {
|
||||
return k, nil
|
||||
}
|
||||
for k, v := range m.unsealedInfoMap.infos {
|
||||
pads, padLength := ffiwrapper.GetRequiredPadding(abi.PaddedPieceSize(v.stored), size.Padded())
|
||||
if v.stored+uint64(size)+uint64(padLength) <= uint64(ss) {
|
||||
return k, pads, nil
|
||||
}
|
||||
}
|
||||
|
||||
return m.newSector()
|
||||
ns, err := m.newSector()
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
|
||||
stored: 0,
|
||||
pieceSizes: nil,
|
||||
}
|
||||
|
||||
return ns, nil, nil
|
||||
}
|
||||
|
||||
// newSector creates a new sector for deal storage
|
||||
@ -222,11 +254,6 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) {
|
||||
}()
|
||||
}
|
||||
|
||||
m.unsealedInfos[sid] = UnsealedSectorInfo{
|
||||
stored: 0,
|
||||
pieceSizes: nil,
|
||||
}
|
||||
|
||||
return sid, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user