diff --git a/chain/deals/provider_states.go b/chain/deals/provider_states.go index 7d94d93e2..00546202d 100644 --- a/chain/deals/provider_states.go +++ b/chain/deals/provider_states.go @@ -219,7 +219,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal) return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") } - sectorID, err := p.secb.AddUnixfsPiece(deal.Ref, uf, deal.DealID) + sectorID, err := p.secb.AddUnixfsPiece(ctx, deal.Ref, uf, deal.DealID) if err != nil { return nil, xerrors.Errorf("AddPiece failed: %s", err) } @@ -228,16 +228,12 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal) return func(deal *MinerDeal) { deal.SectorID = sectorID }, nil - } // SEALING func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - log.Info("About to seal sector!", deal.ProposalCid, deal.SectorID) - if err := p.sminer.SealSector(ctx, deal.SectorID); err != nil { - return nil, xerrors.Errorf("sealing sector failed: %w", err) - } + // TODO: consider waiting for seal to happen return nil, nil } diff --git a/gen/main.go b/gen/main.go index efe1af965..2c9292283 100644 --- a/gen/main.go +++ b/gen/main.go @@ -13,7 +13,6 @@ import ( "github.com/filecoin-project/lotus/paych" "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sector" ) func main() { @@ -153,18 +152,11 @@ func main() { err = gen.WriteTupleEncodersToFile("./storage/cbor_gen.go", "storage", storage.SealTicket{}, + storage.Piece{}, storage.SectorInfo{}, ) if err != nil { fmt.Println(err) os.Exit(1) } - - err = gen.WriteTupleEncodersToFile("./storage/sector/cbor_gen.go", "sector", - sector.DealMapping{}, - ) - if err != nil { - fmt.Println(err) - os.Exit(1) - } } diff --git a/node/builder.go b/node/builder.go index 145a92fe1..cf999ea4b 100644 --- a/node/builder.go +++ b/node/builder.go @@ -41,7 +41,6 @@ import ( "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sector" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -229,9 +228,8 @@ func Online() Option { // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoStorageMiner }, Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), - Override(new(*sector.Store), sector.NewStore), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), - Override(new(sector.TicketFn), modules.SealTicketGen), + Override(new(storage.TicketFn), modules.SealTicketGen), Override(new(*storage.Miner), modules.StorageMiner), Override(new(dtypes.StagingDAG), modules.StagingDAG), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b9efe9f2f..e7b081ea6 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -7,7 +7,6 @@ import ( "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sector" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -16,7 +15,6 @@ type StorageMinerAPI struct { SectorBuilderConfig *sectorbuilder.Config SectorBuilder *sectorbuilder.SectorBuilder - Sectors *sector.Store SectorBlocks *sectorblocks.SectorBlocks Miner *storage.Miner diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 5a9ea4d75..3c827c550 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -28,7 +28,6 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sector" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -81,13 +80,13 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD } } -func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { +func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, tktFn storage.TicketFn) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err } - sm, err := storage.NewMiner(api, maddr, h, ds, secst) + sm, err := storage.NewMiner(api, maddr, h, ds, sb, tktFn) if err != nil { return nil, err } @@ -177,7 +176,7 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro return nil } -func SealTicketGen(api api.FullNode) sector.TicketFn { +func SealTicketGen(api api.FullNode) storage.TicketFn { return func(ctx context.Context) (*sectorbuilder.SealTicket, error) { ts, err := api.ChainHead(ctx) if err != nil { diff --git a/storage/garbage.go b/storage/garbage.go index 8e3cf1a10..8a2dd7a22 100644 --- a/storage/garbage.go +++ b/storage/garbage.go @@ -17,7 +17,7 @@ import ( ) // TODO: expected sector ID -func (m *Miner) storeGarbage(ctx context.Context, sizes ...uint64) ([]uint64, error) { +func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, sizes ...uint64) ([]Piece, error) { deals := make([]actors.StorageDeal, len(sizes)) for i, size := range sizes { commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) @@ -87,41 +87,47 @@ func (m *Miner) storeGarbage(ctx context.Context, sizes ...uint64) ([]uint64, er return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") } - sectorIDs := make([]uint64, len(sizes)) + out := make([]Piece, len(sizes)) for i, size := range sizes { name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) - sectorID, err := m.secst.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), resp.DealIDs[i]) + ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size))) if err != nil { return nil, err } - sectorIDs[i] = sectorID + out[i] = Piece{ + DealID: resp.DealIDs[i], + Ref: name, + Size: ppi.Size, + CommP: ppi.CommP[:], + } } - return sectorIDs, nil + return out, nil } func (m *Miner) StoreGarbageData(_ context.Context) error { ctx := context.TODO() - ssize, err := m.SectorSize(ctx) - if err != nil { - return xerrors.Errorf("failed to get miner sector size: %w", err) - } go func() { - size := sectorbuilder.UserBytesForSectorSize(ssize) + size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) - sids, err := m.storeGarbage(ctx, size) + sid, err := m.sb.AcquireSectorId() if err != nil { log.Errorf("%+v", err) return } - if err := m.SealSector(context.TODO(), sids[0]); err != nil { + pieces, err := m.storeGarbage(ctx, sid, size) + if err != nil { + log.Errorf("%+v", err) + return + } + + if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].Ref, pieces[0].ppi()); err != nil { log.Errorf("%+v", err) return } }() - - return err + return nil } diff --git a/storage/miner.go b/storage/miner.go index 72a6a0a8b..c3ddcc8e5 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,6 +2,7 @@ package storage import ( "context" + "github.com/filecoin-project/lotus/lib/sectorbuilder" "sync" "github.com/filecoin-project/lotus/lib/statestore" @@ -18,7 +19,6 @@ import ( "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/storage/sector" ) var log = logging.Logger("storageminer") @@ -29,7 +29,6 @@ type Miner struct { api storageMinerApi events *events.Events h host.Host - secst *sector.Store maddr address.Address worker address.Address @@ -39,7 +38,9 @@ type Miner struct { schedPost uint64 // Sealing + sb *sectorbuilder.SectorBuilder sectors *statestore.StateStore + tktFn TicketFn sectorIncoming chan *SectorInfo sectorUpdated chan sectorUpdate @@ -73,13 +74,14 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) { +func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, tktFn TicketFn) (*Miner, error) { return &Miner{ api: api, maddr: addr, h: h, - secst: secst, + sb: sb, + tktFn: tktFn, sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey("/sectors"))), @@ -132,8 +134,3 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error { log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker) return nil } - -func (m *Miner) SectorSize(ctx context.Context) (uint64, error) { - // TODO: cache this - return m.api.StateMinerSectorSize(ctx, m.maddr, nil) -} diff --git a/storage/post.go b/storage/post.go index ff21c48f7..0f4c74f38 100644 --- a/storage/post.go +++ b/storage/post.go @@ -2,6 +2,7 @@ package storage import ( "context" + "github.com/filecoin-project/lotus/lib/sectorbuilder" "time" "github.com/ipfs/go-cid" @@ -158,6 +159,21 @@ func (p *post) preparePost(ctx context.Context) error { return nil } +func (p *post) sortedSectorInfo() sectorbuilder.SortedSectorInfo { + sbsi := make([]sectorbuilder.SectorInfo, len(p.sset)) + for k, sector := range p.sset { + var commR [sectorbuilder.CommLen]byte + copy(commR[:], sector.CommR) + + sbsi[k] = sectorbuilder.SectorInfo{ + SectorID: sector.SectorID, + CommR: commR, + } + } + + return sectorbuilder.NewSortedSectorInfo(sbsi) +} + func (p *post) runPost(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "storage.runPost") defer span.End() @@ -168,7 +184,11 @@ func (p *post) runPost(ctx context.Context) error { tsStart := time.Now() var faults []uint64 // TODO - proof, err := p.m.secst.RunPoSt(ctx, p.sset, p.r, faults) + + var seed [32]byte + copy(seed[:], p.r) + + proof, err := p.m.sb.GeneratePoSt(p.sortedSectorInfo(), seed, faults) if err != nil { return xerrors.Errorf("running post failed: %w", err) } diff --git a/storage/sealing.go b/storage/sealing.go index a2723680d..4000d46fe 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -2,25 +2,56 @@ package storage import ( "context" + "io" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" cid "github.com/ipfs/go-cid" xerrors "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" ) +type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) + type SealTicket struct { BlockHeight uint64 TicketBytes []byte } +func (t *SealTicket) sb() sectorbuilder.SealTicket { + out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight} + copy(out.TicketBytes[:], t.TicketBytes) + return out +} + +type Piece struct { + DealID uint64 + Ref string + + Size uint64 + CommP []byte +} + +func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { + out.Size = p.Size + copy(out.CommP[:], p.CommP) + return out +} + type SectorInfo struct { State api.SectorState SectorID uint64 + // Packing + + Pieces []Piece + // PreCommit - CommD []byte - CommR []byte + CommC []byte + CommD []byte + CommR []byte + CommRLast []byte Ticket SealTicket PreCommitMessage *cid.Cid @@ -40,6 +71,41 @@ type sectorUpdate struct { mut func(*SectorInfo) } +func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { + out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.ppi() + } + return out +} + +func (t *SectorInfo) deals() []uint64 { + out := make([]uint64, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.DealID + } + return out +} + +func (t *SectorInfo) refs() []string { + out := make([]string, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.Ref + } + return out +} + +func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { + var out sectorbuilder.RawSealPreCommitOutput + + copy(out.CommC[:], t.CommC) + copy(out.CommD[:], t.CommD) + copy(out.CommR[:], t.CommR) + copy(out.CommRLast[:], t.CommRLast) + + return out +} + func (m *Miner) sectorStateLoop(ctx context.Context) { // TODO: restore state @@ -66,7 +132,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) { return } if has { - log.Warnf("SealSector called more than once for sector %d", sector.SectorID) + log.Warnf("SealPiece called more than once for sector %d", sector.SectorID) return } @@ -129,12 +195,36 @@ func (m *Miner) failSector(id uint64, err error) { panic(err) // todo: better error handling strategy } -func (m *Miner) SealSector(ctx context.Context, sid uint64) error { - log.Infof("Begin sealing sector %d", sid) +func (m *Miner) SealPiece(ctx context.Context, ref string, size uint64, r io.Reader, dealID uint64) (uint64, error) { + log.Infof("Seal piece for deal %d", dealID) + sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector + if err != nil { + return 0, xerrors.Errorf("acquiring sector ID: %w", err) + } + + ppi, err := m.sb.AddPiece(size, sid, r) + if err != nil { + return 0, xerrors.Errorf("adding piece to sector: %w", err) + } + + return sid, m.newSector(ctx, sid, dealID, ref, ppi) +} + +func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ref string, ppi sectorbuilder.PublicPieceInfo) error { si := &SectorInfo{ State: api.UndefinedSectorState, SectorID: sid, + + Pieces: []Piece{ + { + DealID: dealID, + Ref:ref, + + Size: ppi.Size, + CommP: ppi.CommP[:], + }, + }, } select { case m.sectorIncoming <- si: diff --git a/storage/sealing_utils.go b/storage/sealing_utils.go new file mode 100644 index 000000000..17d83abbf --- /dev/null +++ b/storage/sealing_utils.go @@ -0,0 +1,20 @@ +package storage + +import ( + "math/bits" + + "github.com/filecoin-project/lotus/lib/sectorbuilder" +) + +func fillersFromRem(toFill uint64) ([]uint64, error) { + toFill += toFill / 127 // convert to in-sector bytes for easier math + + out := make([]uint64, bits.OnesCount64(toFill)) + for i := range out { + next := bits.TrailingZeros64(toFill) + psize := uint64(1) << next + toFill ^= psize + out[i] = sectorbuilder.UserBytesForSectorSize(psize) + } + return out, nil +} diff --git a/storage/sector/store_test.go b/storage/sealing_utils_test.go similarity index 55% rename from storage/sector/store_test.go rename to storage/sealing_utils_test.go index 78d7ccd0e..6a975e664 100644 --- a/storage/sector/store_test.go +++ b/storage/sealing_utils_test.go @@ -1,18 +1,11 @@ -package sector +package storage import ( - "context" - "fmt" - "github.com/filecoin-project/lotus/lib/padreader" - "io" - "math/rand" "testing" "github.com/stretchr/testify/assert" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/ipfs/go-datastore" ) func testFill(t *testing.T, n uint64, exp []uint64) { @@ -51,35 +44,3 @@ func TestFillersFromRem(t *testing.T) { } } - -func TestSectorStore(t *testing.T) { - if err := build.GetParams(true); err != nil { - t.Fatal(err) - } - - sb, cleanup, err := sectorbuilder.TempSectorbuilder(1024) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - tktFn := func(context.Context) (*sectorbuilder.SealTicket, error) { - return §orbuilder.SealTicket{ - BlockHeight: 17, - TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2}, - }, nil - } - - ds := datastore.NewMapDatastore() - - store := NewStore(sb, ds, tktFn) - - pr := io.LimitReader(rand.New(rand.NewSource(17)), 300) - pr, n := padreader.New(pr, 300) - - sid, err := store.AddPiece("a", n, pr, 1) - if err != nil { - t.Fatal(err) - } - fmt.Println(sid) -} diff --git a/storage/sector/cbor_gen.go b/storage/sector/cbor_gen.go deleted file mode 100644 index 11a2365f1..000000000 --- a/storage/sector/cbor_gen.go +++ /dev/null @@ -1,119 +0,0 @@ -package sector - -import ( - "fmt" - "io" - - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -/* This file was generated by github.com/whyrusleeping/cbor-gen */ - -var _ = xerrors.Errorf - -func (t *DealMapping) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{131}); err != nil { - return err - } - - // t.t.DealIDs ([]uint64) (slice) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.DealIDs)))); err != nil { - return err - } - for _, v := range t.DealIDs { - if err := cbg.CborWriteHeader(w, cbg.MajUnsignedInt, v); err != nil { - return err - } - } - - // t.t.Allocated (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Allocated))); err != nil { - return err - } - - // t.t.Committed (bool) (bool) - if err := cbg.WriteBool(w, t.Committed); err != nil { - return err - } - return nil -} - -func (t *DealMapping) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 3 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.t.DealIDs ([]uint64) (slice) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if extra > 8192 { - return fmt.Errorf("t.DealIDs: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - if extra > 0 { - t.DealIDs = make([]uint64, extra) - } - for i := 0; i < int(extra); i++ { - - maj, val, err := cbg.CborReadHeader(br) - if err != nil { - return xerrors.Errorf("failed to read uint64 for t.DealIDs slice: %w", err) - } - - if maj != cbg.MajUnsignedInt { - return xerrors.Errorf("value read for array t.DealIDs was not a uint, instead got %d", maj) - } - - t.DealIDs[i] = val - } - - // t.t.Allocated (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Allocated = uint64(extra) - // t.t.Committed (bool) (bool) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajOther { - return fmt.Errorf("booleans must be major type 7") - } - switch extra { - case 20: - t.Committed = false - case 21: - t.Committed = true - default: - return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) - } - return nil -} diff --git a/storage/sector/store.go b/storage/sector/store.go deleted file mode 100644 index 47bd5af13..000000000 --- a/storage/sector/store.go +++ /dev/null @@ -1,238 +0,0 @@ -package sector - -import ( - "bytes" - "context" - "fmt" - "github.com/filecoin-project/go-sectorbuilder/sealing_state" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - logging "github.com/ipfs/go-log" - "golang.org/x/xerrors" - "io" - "math/bits" - "sync" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/cborutil" - "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/node/modules/dtypes" -) - -var log = logging.Logger("sectorstore") - -var sectorDealsPrefix = datastore.NewKey("/sectordeals") - -type DealMapping struct { - DealIDs []uint64 - Allocated uint64 - Committed bool -} - -type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) - -// TODO: eventually handle sector storage here instead of in rust-sectorbuilder -type Store struct { - sb *sectorbuilder.SectorBuilder - tktFn TicketFn - - dealsLk sync.Mutex - deals datastore.Datastore -} - -func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn TicketFn) *Store { - return &Store{ - sb: sb, - tktFn: tktFn, - deals: namespace.Wrap(ds, sectorDealsPrefix), - } -} - -func (s *Store) SectorStatus(sid uint64) (*sectorbuilder.SectorSealingStatus, error) { - status, err := s.sb.SealStatus(sid) - if err != nil { - return nil, err - } - - return &status, nil -} - -func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) { - sectorID, err = s.sb.AddPiece(ref, size, r) - if err != nil { - return 0, err - } - - s.dealsLk.Lock() - defer s.dealsLk.Unlock() - - k := datastore.NewKey(fmt.Sprint(sectorID)) - e, err := s.deals.Get(k) - var deals DealMapping - switch err { - case nil: - if err := cborutil.ReadCborRPC(bytes.NewReader(e), &deals); err != nil { - return 0, err - } - if deals.Committed { - return 0, xerrors.Errorf("sector %d already committed", sectorID) - } - fallthrough - case datastore.ErrNotFound: - deals.DealIDs = append(deals.DealIDs, dealIDs...) - deals.Allocated += size - - d, err := cborutil.Dump(&deals) - if err != nil { - return 0, err - } - if err := s.deals.Put(k, d); err != nil { - return 0, err - } - default: - return 0, err - } - - return sectorID, nil -} - -func (s *Store) PieceSizesToFill(sectorID uint64) ([]uint64, error) { - s.dealsLk.Lock() - defer s.dealsLk.Unlock() - - k := datastore.NewKey(fmt.Sprint(sectorID)) - e, err := s.deals.Get(k) - if err != nil { - return nil, err - } - var info DealMapping - if err := cborutil.ReadCborRPC(bytes.NewReader(e), &info); err != nil { - return nil, err - } - if info.Allocated > s.sb.SectorSize() { - return nil, xerrors.Errorf("more data allocated in sector than should be able to fit: %d > %d", info.Allocated, s.sb.SectorSize()) - } - - return fillersFromRem(sectorbuilder.UserBytesForSectorSize(s.sb.SectorSize()) - info.Allocated) -} - -func fillersFromRem(toFill uint64) ([]uint64, error) { - toFill += toFill / 127 // convert to in-sector bytes for easier math - - out := make([]uint64, bits.OnesCount64(toFill)) - for i := range out { - next := bits.TrailingZeros64(toFill) - psize := uint64(1) << next - toFill ^= psize - out[i] = sectorbuilder.UserBytesForSectorSize(psize) - } - return out, nil -} - -func (s *Store) DealsForCommit(sectorID uint64, commit bool) ([]uint64, error) { - s.dealsLk.Lock() - defer s.dealsLk.Unlock() - - k := datastore.NewKey(fmt.Sprint(sectorID)) - e, err := s.deals.Get(k) - - switch err { - case nil: - var deals DealMapping - if err := cborutil.ReadCborRPC(bytes.NewReader(e), &deals); err != nil { - return nil, err - } - if !commit { - return nil, nil - } - - if deals.Committed { - log.Errorf("getting deal IDs for sector %d: sector already marked as committed", sectorID) - } - - deals.Committed = true - d, err := cborutil.Dump(&deals) - if err != nil { - return nil, err - } - if err := s.deals.Put(k, d); err != nil { - return nil, err - } - - return deals.DealIDs, nil - case datastore.ErrNotFound: - log.Errorf("getting deal IDs for sector %d failed: %s", err) - return []uint64{}, nil - default: - return nil, err - } -} - -func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) (sectorbuilder.SealPreCommitOutput, error) { - tkt, err := s.tktFn(ctx) - if err != nil { - return sectorbuilder.SealPreCommitOutput{}, err - } - - return s.sb.SealPreCommit(sectorID, *tkt) -} - -func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height uint64, rand []byte) ([]byte, error) { - var tick [32]byte - copy(tick[:], rand) - - sco, err := s.sb.SealCommit(sectorID, sectorbuilder.SealSeed{ - BlockHeight: height, - TicketBytes: tick, - }) - if err != nil { - return nil, err - } - return sco.Proof, nil -} - -func (s *Store) Committed() ([]sectorbuilder.SectorSealingStatus, error) { - l, err := s.sb.GetAllStagedSectors() - if err != nil { - return nil, err - } - - out := make([]sectorbuilder.SectorSealingStatus, 0) - for _, sid := range l { - status, err := s.sb.SealStatus(sid) - if err != nil { - return nil, err - } - - if status.State != sealing_state.Committed { - continue - } - out = append(out, status) - } - - return out, nil -} - -func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) { - sbsi := make([]sectorbuilder.SectorInfo, len(sectors)) - for k, sector := range sectors { - var commR [sectorbuilder.CommLen]byte - if copy(commR[:], sector.CommR) != sectorbuilder.CommLen { - return nil, xerrors.Errorf("commR too short, %d bytes", len(sector.CommR)) - } - - sbsi[k] = sectorbuilder.SectorInfo{ - SectorID: sector.SectorID, - CommR: commR, - } - } - - ssi := sectorbuilder.NewSortedSectorInfo(sbsi) - - var seed [sectorbuilder.CommLen]byte - if copy(seed[:], r) != sectorbuilder.CommLen { - return nil, xerrors.Errorf("random seed too short, %d bytes", len(r)) - } - - return s.sb.GeneratePoSt(ssi, seed, faults) -} diff --git a/storage/sector_states.go b/storage/sector_states.go index b52402c4f..ded491192 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -2,6 +2,7 @@ package storage import ( "context" + "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" @@ -37,7 +38,16 @@ func (m *Miner) handle(ctx context.Context, sector SectorInfo, cb providerHandle func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) - fillerSizes, err := m.secst.PieceSizesToFill(sector.SectorID) + var allocated uint64 + for _, piece := range sector.Pieces { + allocated += piece.Size + } + + if allocated > m.sb.SectorSize() { + return nil, xerrors.Errorf("too much data in sector: %d > %d", allocated, m.sb.SectorSize()) + } + + fillerSizes, err := fillersFromRem(m.sb.SectorSize() - allocated) if err != nil { return nil, err } @@ -46,49 +56,47 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) } - ids, err := m.storeGarbage(ctx, fillerSizes...) + pieces, err := m.storeGarbage(ctx, sector.SectorID, fillerSizes...) if err != nil { return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } - for _, id := range ids { - if id != sector.SectorID { - panic("todo: pass SectorID into storeGarbage") - } - } - - return nil, nil + return func(info *SectorInfo) { + info.Pieces = append(info.Pieces, pieces...) + }, nil } func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { log.Infow("performing sector replication...", "sector", sector.SectorID) - sinfo, err := m.secst.SealPreCommit(ctx, sector.SectorID) + ticket, err := m.tktFn(ctx) + if err != nil { + return nil, err + } + + rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) if err != nil { return nil, xerrors.Errorf("seal pre commit failed: %w", err) } return func(info *SectorInfo) { - info.CommD = sinfo.CommD[:] - info.CommR = sinfo.CommR[:] + info.CommC = rspco.CommC[:] + info.CommD = rspco.CommD[:] + info.CommR = rspco.CommR[:] + info.CommRLast = rspco.CommRLast[:] info.Ticket = SealTicket{ - BlockHeight: sinfo.Ticket.BlockHeight, - TicketBytes: sinfo.Ticket.TicketBytes[:], + BlockHeight: ticket.BlockHeight, + TicketBytes: ticket.TicketBytes[:], } }, nil } func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { - deals, err := m.secst.DealsForCommit(sector.SectorID, false) - if err != nil { - return nil, err - } - params := &actors.SectorPreCommitInfo{ SectorNumber: sector.SectorID, CommR: sector.CommR, SealEpoch: sector.Ticket.BlockHeight, - DealIDs: deals, + DealIDs: sector.deals(), } enc, aerr := actors.SerializeParams(params) if aerr != nil { @@ -164,20 +172,20 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector return nil, xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) } - proof, err := m.secst.SealComputeProof(ctx, sector.SectorID, sector.RandHeight, rand) + seed := sectorbuilder.SealSeed{ + BlockHeight: sector.RandHeight, + } + copy(seed.TicketBytes[:], rand) + + proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.sb(), seed, sector.pieceInfos(), sector.refs(), sector.rspco()) if err != nil { return nil, xerrors.Errorf("computing seal proof failed: %w", err) } - deals, err := m.secst.DealsForCommit(sector.SectorID, true) - if err != nil { - return nil, err - } - params := &actors.SectorProveCommitInfo{ Proof: proof, SectorID: sector.SectorID, - DealIDs: deals, + DealIDs: sector.deals(), } enc, aerr := actors.SerializeParams(params) diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index 9f40742d3..e1f64ae3e 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -23,7 +23,7 @@ import ( "github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/storage/sector" + "github.com/filecoin-project/lotus/storage" ) type SealSerialization uint8 @@ -38,7 +38,7 @@ var imBlocksPrefix = datastore.NewKey("/intermediate") var ErrNotFound = errors.New("not found") type SectorBlocks struct { - *sector.Store + *storage.Miner intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore @@ -47,9 +47,9 @@ type SectorBlocks struct { keyLk sync.Mutex } -func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks { +func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks { sbc := &SectorBlocks{ - Store: sectst, + Miner: miner, intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)), @@ -160,7 +160,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) { } } -func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint64) (sectorID uint64, err error) { +func (st *SectorBlocks) AddUnixfsPiece(ctx context.Context, ref cid.Cid, r UnixfsReader, dealID uint64) (sectorID uint64, err error) { size, err := r.Size() if err != nil { return 0, err @@ -175,7 +175,7 @@ func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint6 pr, psize := padreader.New(r, uint64(size)) - return st.Store.AddPiece(refst.pieceRef, psize, pr, dealID) + return st.Miner.SealPiece(ctx, refst.pieceRef, psize, pr, dealID) } func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {