Sector storage refactor

This commit is contained in:
Łukasz Magiera 2019-11-07 19:22:59 +01:00
parent 69b4bd9fb4
commit 2d26a4edf7
15 changed files with 213 additions and 485 deletions

View File

@ -219,7 +219,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
} }
sectorID, err := p.secb.AddUnixfsPiece(deal.Ref, uf, deal.DealID) sectorID, err := p.secb.AddUnixfsPiece(ctx, deal.Ref, uf, deal.DealID)
if err != nil { if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err) return nil, xerrors.Errorf("AddPiece failed: %s", err)
} }
@ -228,16 +228,12 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return func(deal *MinerDeal) { return func(deal *MinerDeal) {
deal.SectorID = sectorID deal.SectorID = sectorID
}, nil }, nil
} }
// SEALING // SEALING
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
log.Info("About to seal sector!", deal.ProposalCid, deal.SectorID) // TODO: consider waiting for seal to happen
if err := p.sminer.SealSector(ctx, deal.SectorID); err != nil {
return nil, xerrors.Errorf("sealing sector failed: %w", err)
}
return nil, nil return nil, nil
} }

View File

@ -13,7 +13,6 @@ import (
"github.com/filecoin-project/lotus/paych" "github.com/filecoin-project/lotus/paych"
"github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sector"
) )
func main() { func main() {
@ -153,18 +152,11 @@ func main() {
err = gen.WriteTupleEncodersToFile("./storage/cbor_gen.go", "storage", err = gen.WriteTupleEncodersToFile("./storage/cbor_gen.go", "storage",
storage.SealTicket{}, storage.SealTicket{},
storage.Piece{},
storage.SectorInfo{}, storage.SectorInfo{},
) )
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
err = gen.WriteTupleEncodersToFile("./storage/sector/cbor_gen.go", "sector",
sector.DealMapping{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} }

View File

@ -41,7 +41,6 @@ import (
"github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sector"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
) )
@ -229,9 +228,8 @@ func Online() Option {
// Storage miner // Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoStorageMiner }, ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoStorageMiner },
Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New),
Override(new(*sector.Store), sector.NewStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(sector.TicketFn), modules.SealTicketGen), Override(new(storage.TicketFn), modules.SealTicketGen),
Override(new(*storage.Miner), modules.StorageMiner), Override(new(*storage.Miner), modules.StorageMiner),
Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingDAG), modules.StagingDAG),

View File

@ -7,7 +7,6 @@ import (
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sector"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
) )
@ -16,7 +15,6 @@ type StorageMinerAPI struct {
SectorBuilderConfig *sectorbuilder.Config SectorBuilderConfig *sectorbuilder.Config
SectorBuilder *sectorbuilder.SectorBuilder SectorBuilder *sectorbuilder.SectorBuilder
Sectors *sector.Store
SectorBlocks *sectorblocks.SectorBlocks SectorBlocks *sectorblocks.SectorBlocks
Miner *storage.Miner Miner *storage.Miner

View File

@ -28,7 +28,6 @@ import (
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sector"
) )
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
@ -81,13 +80,13 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD
} }
} }
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, tktFn storage.TicketFn) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sm, err := storage.NewMiner(api, maddr, h, ds, secst) sm, err := storage.NewMiner(api, maddr, h, ds, sb, tktFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -177,7 +176,7 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro
return nil return nil
} }
func SealTicketGen(api api.FullNode) sector.TicketFn { func SealTicketGen(api api.FullNode) storage.TicketFn {
return func(ctx context.Context) (*sectorbuilder.SealTicket, error) { return func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
ts, err := api.ChainHead(ctx) ts, err := api.ChainHead(ctx)
if err != nil { if err != nil {

View File

@ -17,7 +17,7 @@ import (
) )
// TODO: expected sector ID // TODO: expected sector ID
func (m *Miner) storeGarbage(ctx context.Context, sizes ...uint64) ([]uint64, error) { func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, sizes ...uint64) ([]Piece, error) {
deals := make([]actors.StorageDeal, len(sizes)) deals := make([]actors.StorageDeal, len(sizes))
for i, size := range sizes { for i, size := range sizes {
commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size)
@ -87,41 +87,47 @@ func (m *Miner) storeGarbage(ctx context.Context, sizes ...uint64) ([]uint64, er
return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals")
} }
sectorIDs := make([]uint64, len(sizes)) out := make([]Piece, len(sizes))
for i, size := range sizes { for i, size := range sizes {
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
sectorID, err := m.secst.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), resp.DealIDs[i]) ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)))
if err != nil { if err != nil {
return nil, err return nil, err
} }
sectorIDs[i] = sectorID out[i] = Piece{
DealID: resp.DealIDs[i],
Ref: name,
Size: ppi.Size,
CommP: ppi.CommP[:],
}
} }
return sectorIDs, nil return out, nil
} }
func (m *Miner) StoreGarbageData(_ context.Context) error { func (m *Miner) StoreGarbageData(_ context.Context) error {
ctx := context.TODO() ctx := context.TODO()
ssize, err := m.SectorSize(ctx)
if err != nil {
return xerrors.Errorf("failed to get miner sector size: %w", err)
}
go func() { go func() {
size := sectorbuilder.UserBytesForSectorSize(ssize) size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
sids, err := m.storeGarbage(ctx, size) sid, err := m.sb.AcquireSectorId()
if err != nil { if err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
return return
} }
if err := m.SealSector(context.TODO(), sids[0]); err != nil { pieces, err := m.storeGarbage(ctx, sid, size)
if err != nil {
log.Errorf("%+v", err)
return
}
if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].Ref, pieces[0].ppi()); err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
return return
} }
}() }()
return nil
return err
} }

View File

@ -2,6 +2,7 @@ package storage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"sync" "sync"
"github.com/filecoin-project/lotus/lib/statestore" "github.com/filecoin-project/lotus/lib/statestore"
@ -18,7 +19,6 @@ import (
"github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sector"
) )
var log = logging.Logger("storageminer") var log = logging.Logger("storageminer")
@ -29,7 +29,6 @@ type Miner struct {
api storageMinerApi api storageMinerApi
events *events.Events events *events.Events
h host.Host h host.Host
secst *sector.Store
maddr address.Address maddr address.Address
worker address.Address worker address.Address
@ -39,7 +38,9 @@ type Miner struct {
schedPost uint64 schedPost uint64
// Sealing // Sealing
sb *sectorbuilder.SectorBuilder
sectors *statestore.StateStore sectors *statestore.StateStore
tktFn TicketFn
sectorIncoming chan *SectorInfo sectorIncoming chan *SectorInfo
sectorUpdated chan sectorUpdate sectorUpdated chan sectorUpdate
@ -73,13 +74,14 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) { func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, tktFn TicketFn) (*Miner, error) {
return &Miner{ return &Miner{
api: api, api: api,
maddr: addr, maddr: addr,
h: h, h: h,
secst: secst, sb: sb,
tktFn: tktFn,
sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey("/sectors"))), sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey("/sectors"))),
@ -132,8 +134,3 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker) log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker)
return nil return nil
} }
func (m *Miner) SectorSize(ctx context.Context) (uint64, error) {
// TODO: cache this
return m.api.StateMinerSectorSize(ctx, m.maddr, nil)
}

View File

@ -2,6 +2,7 @@ package storage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -158,6 +159,21 @@ func (p *post) preparePost(ctx context.Context) error {
return nil return nil
} }
func (p *post) sortedSectorInfo() sectorbuilder.SortedSectorInfo {
sbsi := make([]sectorbuilder.SectorInfo, len(p.sset))
for k, sector := range p.sset {
var commR [sectorbuilder.CommLen]byte
copy(commR[:], sector.CommR)
sbsi[k] = sectorbuilder.SectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
return sectorbuilder.NewSortedSectorInfo(sbsi)
}
func (p *post) runPost(ctx context.Context) error { func (p *post) runPost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.runPost") ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End() defer span.End()
@ -168,7 +184,11 @@ func (p *post) runPost(ctx context.Context) error {
tsStart := time.Now() tsStart := time.Now()
var faults []uint64 // TODO var faults []uint64 // TODO
proof, err := p.m.secst.RunPoSt(ctx, p.sset, p.r, faults)
var seed [32]byte
copy(seed[:], p.r)
proof, err := p.m.sb.GeneratePoSt(p.sortedSectorInfo(), seed, faults)
if err != nil { if err != nil {
return xerrors.Errorf("running post failed: %w", err) return xerrors.Errorf("running post failed: %w", err)
} }

View File

@ -2,25 +2,56 @@ package storage
import ( import (
"context" "context"
"io"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
) )
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type SealTicket struct { type SealTicket struct {
BlockHeight uint64 BlockHeight uint64
TicketBytes []byte TicketBytes []byte
} }
func (t *SealTicket) sb() sectorbuilder.SealTicket {
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type Piece struct {
DealID uint64
Ref string
Size uint64
CommP []byte
}
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
out.Size = p.Size
copy(out.CommP[:], p.CommP)
return out
}
type SectorInfo struct { type SectorInfo struct {
State api.SectorState State api.SectorState
SectorID uint64 SectorID uint64
// Packing
Pieces []Piece
// PreCommit // PreCommit
CommC []byte
CommD []byte CommD []byte
CommR []byte CommR []byte
CommRLast []byte
Ticket SealTicket Ticket SealTicket
PreCommitMessage *cid.Cid PreCommitMessage *cid.Cid
@ -40,6 +71,41 @@ type sectorUpdate struct {
mut func(*SectorInfo) mut func(*SectorInfo)
} }
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.ppi()
}
return out
}
func (t *SectorInfo) deals() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.DealID
}
return out
}
func (t *SectorInfo) refs() []string {
out := make([]string, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.Ref
}
return out
}
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out
}
func (m *Miner) sectorStateLoop(ctx context.Context) { func (m *Miner) sectorStateLoop(ctx context.Context) {
// TODO: restore state // TODO: restore state
@ -66,7 +132,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) {
return return
} }
if has { if has {
log.Warnf("SealSector called more than once for sector %d", sector.SectorID) log.Warnf("SealPiece called more than once for sector %d", sector.SectorID)
return return
} }
@ -129,12 +195,36 @@ func (m *Miner) failSector(id uint64, err error) {
panic(err) // todo: better error handling strategy panic(err) // todo: better error handling strategy
} }
func (m *Miner) SealSector(ctx context.Context, sid uint64) error { func (m *Miner) SealPiece(ctx context.Context, ref string, size uint64, r io.Reader, dealID uint64) (uint64, error) {
log.Infof("Begin sealing sector %d", sid) log.Infof("Seal piece for deal %d", dealID)
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
if err != nil {
return 0, xerrors.Errorf("acquiring sector ID: %w", err)
}
ppi, err := m.sb.AddPiece(size, sid, r)
if err != nil {
return 0, xerrors.Errorf("adding piece to sector: %w", err)
}
return sid, m.newSector(ctx, sid, dealID, ref, ppi)
}
func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ref string, ppi sectorbuilder.PublicPieceInfo) error {
si := &SectorInfo{ si := &SectorInfo{
State: api.UndefinedSectorState, State: api.UndefinedSectorState,
SectorID: sid, SectorID: sid,
Pieces: []Piece{
{
DealID: dealID,
Ref:ref,
Size: ppi.Size,
CommP: ppi.CommP[:],
},
},
} }
select { select {
case m.sectorIncoming <- si: case m.sectorIncoming <- si:

20
storage/sealing_utils.go Normal file
View File

@ -0,0 +1,20 @@
package storage
import (
"math/bits"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
func fillersFromRem(toFill uint64) ([]uint64, error) {
toFill += toFill / 127 // convert to in-sector bytes for easier math
out := make([]uint64, bits.OnesCount64(toFill))
for i := range out {
next := bits.TrailingZeros64(toFill)
psize := uint64(1) << next
toFill ^= psize
out[i] = sectorbuilder.UserBytesForSectorSize(psize)
}
return out, nil
}

View File

@ -1,18 +1,11 @@
package sector package storage
import ( import (
"context"
"fmt"
"github.com/filecoin-project/lotus/lib/padreader"
"io"
"math/rand"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/ipfs/go-datastore"
) )
func testFill(t *testing.T, n uint64, exp []uint64) { func testFill(t *testing.T, n uint64, exp []uint64) {
@ -51,35 +44,3 @@ func TestFillersFromRem(t *testing.T) {
} }
} }
func TestSectorStore(t *testing.T) {
if err := build.GetParams(true); err != nil {
t.Fatal(err)
}
sb, cleanup, err := sectorbuilder.TempSectorbuilder(1024)
if err != nil {
t.Fatal(err)
}
defer cleanup()
tktFn := func(context.Context) (*sectorbuilder.SealTicket, error) {
return &sectorbuilder.SealTicket{
BlockHeight: 17,
TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2},
}, nil
}
ds := datastore.NewMapDatastore()
store := NewStore(sb, ds, tktFn)
pr := io.LimitReader(rand.New(rand.NewSource(17)), 300)
pr, n := padreader.New(pr, 300)
sid, err := store.AddPiece("a", n, pr, 1)
if err != nil {
t.Fatal(err)
}
fmt.Println(sid)
}

View File

@ -1,119 +0,0 @@
package sector
import (
"fmt"
"io"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
/* This file was generated by github.com/whyrusleeping/cbor-gen */
var _ = xerrors.Errorf
func (t *DealMapping) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{131}); err != nil {
return err
}
// t.t.DealIDs ([]uint64) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.DealIDs)))); err != nil {
return err
}
for _, v := range t.DealIDs {
if err := cbg.CborWriteHeader(w, cbg.MajUnsignedInt, v); err != nil {
return err
}
}
// t.t.Allocated (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Allocated))); err != nil {
return err
}
// t.t.Committed (bool) (bool)
if err := cbg.WriteBool(w, t.Committed); err != nil {
return err
}
return nil
}
func (t *DealMapping) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.DealIDs ([]uint64) (slice)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.DealIDs: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.DealIDs = make([]uint64, extra)
}
for i := 0; i < int(extra); i++ {
maj, val, err := cbg.CborReadHeader(br)
if err != nil {
return xerrors.Errorf("failed to read uint64 for t.DealIDs slice: %w", err)
}
if maj != cbg.MajUnsignedInt {
return xerrors.Errorf("value read for array t.DealIDs was not a uint, instead got %d", maj)
}
t.DealIDs[i] = val
}
// t.t.Allocated (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Allocated = uint64(extra)
// t.t.Committed (bool) (bool)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajOther {
return fmt.Errorf("booleans must be major type 7")
}
switch extra {
case 20:
t.Committed = false
case 21:
t.Committed = true
default:
return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
}
return nil
}

View File

@ -1,238 +0,0 @@
package sector
import (
"bytes"
"context"
"fmt"
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"io"
"math/bits"
"sync"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
var log = logging.Logger("sectorstore")
var sectorDealsPrefix = datastore.NewKey("/sectordeals")
type DealMapping struct {
DealIDs []uint64
Allocated uint64
Committed bool
}
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
type Store struct {
sb *sectorbuilder.SectorBuilder
tktFn TicketFn
dealsLk sync.Mutex
deals datastore.Datastore
}
func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn TicketFn) *Store {
return &Store{
sb: sb,
tktFn: tktFn,
deals: namespace.Wrap(ds, sectorDealsPrefix),
}
}
func (s *Store) SectorStatus(sid uint64) (*sectorbuilder.SectorSealingStatus, error) {
status, err := s.sb.SealStatus(sid)
if err != nil {
return nil, err
}
return &status, nil
}
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) {
sectorID, err = s.sb.AddPiece(ref, size, r)
if err != nil {
return 0, err
}
s.dealsLk.Lock()
defer s.dealsLk.Unlock()
k := datastore.NewKey(fmt.Sprint(sectorID))
e, err := s.deals.Get(k)
var deals DealMapping
switch err {
case nil:
if err := cborutil.ReadCborRPC(bytes.NewReader(e), &deals); err != nil {
return 0, err
}
if deals.Committed {
return 0, xerrors.Errorf("sector %d already committed", sectorID)
}
fallthrough
case datastore.ErrNotFound:
deals.DealIDs = append(deals.DealIDs, dealIDs...)
deals.Allocated += size
d, err := cborutil.Dump(&deals)
if err != nil {
return 0, err
}
if err := s.deals.Put(k, d); err != nil {
return 0, err
}
default:
return 0, err
}
return sectorID, nil
}
func (s *Store) PieceSizesToFill(sectorID uint64) ([]uint64, error) {
s.dealsLk.Lock()
defer s.dealsLk.Unlock()
k := datastore.NewKey(fmt.Sprint(sectorID))
e, err := s.deals.Get(k)
if err != nil {
return nil, err
}
var info DealMapping
if err := cborutil.ReadCborRPC(bytes.NewReader(e), &info); err != nil {
return nil, err
}
if info.Allocated > s.sb.SectorSize() {
return nil, xerrors.Errorf("more data allocated in sector than should be able to fit: %d > %d", info.Allocated, s.sb.SectorSize())
}
return fillersFromRem(sectorbuilder.UserBytesForSectorSize(s.sb.SectorSize()) - info.Allocated)
}
func fillersFromRem(toFill uint64) ([]uint64, error) {
toFill += toFill / 127 // convert to in-sector bytes for easier math
out := make([]uint64, bits.OnesCount64(toFill))
for i := range out {
next := bits.TrailingZeros64(toFill)
psize := uint64(1) << next
toFill ^= psize
out[i] = sectorbuilder.UserBytesForSectorSize(psize)
}
return out, nil
}
func (s *Store) DealsForCommit(sectorID uint64, commit bool) ([]uint64, error) {
s.dealsLk.Lock()
defer s.dealsLk.Unlock()
k := datastore.NewKey(fmt.Sprint(sectorID))
e, err := s.deals.Get(k)
switch err {
case nil:
var deals DealMapping
if err := cborutil.ReadCborRPC(bytes.NewReader(e), &deals); err != nil {
return nil, err
}
if !commit {
return nil, nil
}
if deals.Committed {
log.Errorf("getting deal IDs for sector %d: sector already marked as committed", sectorID)
}
deals.Committed = true
d, err := cborutil.Dump(&deals)
if err != nil {
return nil, err
}
if err := s.deals.Put(k, d); err != nil {
return nil, err
}
return deals.DealIDs, nil
case datastore.ErrNotFound:
log.Errorf("getting deal IDs for sector %d failed: %s", err)
return []uint64{}, nil
default:
return nil, err
}
}
func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) (sectorbuilder.SealPreCommitOutput, error) {
tkt, err := s.tktFn(ctx)
if err != nil {
return sectorbuilder.SealPreCommitOutput{}, err
}
return s.sb.SealPreCommit(sectorID, *tkt)
}
func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height uint64, rand []byte) ([]byte, error) {
var tick [32]byte
copy(tick[:], rand)
sco, err := s.sb.SealCommit(sectorID, sectorbuilder.SealSeed{
BlockHeight: height,
TicketBytes: tick,
})
if err != nil {
return nil, err
}
return sco.Proof, nil
}
func (s *Store) Committed() ([]sectorbuilder.SectorSealingStatus, error) {
l, err := s.sb.GetAllStagedSectors()
if err != nil {
return nil, err
}
out := make([]sectorbuilder.SectorSealingStatus, 0)
for _, sid := range l {
status, err := s.sb.SealStatus(sid)
if err != nil {
return nil, err
}
if status.State != sealing_state.Committed {
continue
}
out = append(out, status)
}
return out, nil
}
func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {
sbsi := make([]sectorbuilder.SectorInfo, len(sectors))
for k, sector := range sectors {
var commR [sectorbuilder.CommLen]byte
if copy(commR[:], sector.CommR) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("commR too short, %d bytes", len(sector.CommR))
}
sbsi[k] = sectorbuilder.SectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
ssi := sectorbuilder.NewSortedSectorInfo(sbsi)
var seed [sectorbuilder.CommLen]byte
if copy(seed[:], r) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("random seed too short, %d bytes", len(r))
}
return s.sb.GeneratePoSt(ssi, seed, faults)
}

View File

@ -2,6 +2,7 @@ package storage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
@ -37,7 +38,16 @@ func (m *Miner) handle(ctx context.Context, sector SectorInfo, cb providerHandle
func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
fillerSizes, err := m.secst.PieceSizesToFill(sector.SectorID) var allocated uint64
for _, piece := range sector.Pieces {
allocated += piece.Size
}
if allocated > m.sb.SectorSize() {
return nil, xerrors.Errorf("too much data in sector: %d > %d", allocated, m.sb.SectorSize())
}
fillerSizes, err := fillersFromRem(m.sb.SectorSize() - allocated)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -46,49 +56,47 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
} }
ids, err := m.storeGarbage(ctx, fillerSizes...) pieces, err := m.storeGarbage(ctx, sector.SectorID, fillerSizes...)
if err != nil { if err != nil {
return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
} }
for _, id := range ids { return func(info *SectorInfo) {
if id != sector.SectorID { info.Pieces = append(info.Pieces, pieces...)
panic("todo: pass SectorID into storeGarbage") }, nil
}
}
return nil, nil
} }
func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
log.Infow("performing sector replication...", "sector", sector.SectorID) log.Infow("performing sector replication...", "sector", sector.SectorID)
sinfo, err := m.secst.SealPreCommit(ctx, sector.SectorID) ticket, err := m.tktFn(ctx)
if err != nil {
return nil, err
}
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
if err != nil { if err != nil {
return nil, xerrors.Errorf("seal pre commit failed: %w", err) return nil, xerrors.Errorf("seal pre commit failed: %w", err)
} }
return func(info *SectorInfo) { return func(info *SectorInfo) {
info.CommD = sinfo.CommD[:] info.CommC = rspco.CommC[:]
info.CommR = sinfo.CommR[:] info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:]
info.CommRLast = rspco.CommRLast[:]
info.Ticket = SealTicket{ info.Ticket = SealTicket{
BlockHeight: sinfo.Ticket.BlockHeight, BlockHeight: ticket.BlockHeight,
TicketBytes: sinfo.Ticket.TicketBytes[:], TicketBytes: ticket.TicketBytes[:],
} }
}, nil }, nil
} }
func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
deals, err := m.secst.DealsForCommit(sector.SectorID, false)
if err != nil {
return nil, err
}
params := &actors.SectorPreCommitInfo{ params := &actors.SectorPreCommitInfo{
SectorNumber: sector.SectorID, SectorNumber: sector.SectorID,
CommR: sector.CommR, CommR: sector.CommR,
SealEpoch: sector.Ticket.BlockHeight, SealEpoch: sector.Ticket.BlockHeight,
DealIDs: deals, DealIDs: sector.deals(),
} }
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
@ -164,20 +172,20 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector
return nil, xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) return nil, xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
} }
proof, err := m.secst.SealComputeProof(ctx, sector.SectorID, sector.RandHeight, rand) seed := sectorbuilder.SealSeed{
BlockHeight: sector.RandHeight,
}
copy(seed.TicketBytes[:], rand)
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.sb(), seed, sector.pieceInfos(), sector.refs(), sector.rspco())
if err != nil { if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err) return nil, xerrors.Errorf("computing seal proof failed: %w", err)
} }
deals, err := m.secst.DealsForCommit(sector.SectorID, true)
if err != nil {
return nil, err
}
params := &actors.SectorProveCommitInfo{ params := &actors.SectorProveCommitInfo{
Proof: proof, Proof: proof,
SectorID: sector.SectorID, SectorID: sector.SectorID,
DealIDs: deals, DealIDs: sector.deals(),
} }
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)

View File

@ -23,7 +23,7 @@ import (
"github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sector" "github.com/filecoin-project/lotus/storage"
) )
type SealSerialization uint8 type SealSerialization uint8
@ -38,7 +38,7 @@ var imBlocksPrefix = datastore.NewKey("/intermediate")
var ErrNotFound = errors.New("not found") var ErrNotFound = errors.New("not found")
type SectorBlocks struct { type SectorBlocks struct {
*sector.Store *storage.Miner
intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore
@ -47,9 +47,9 @@ type SectorBlocks struct {
keyLk sync.Mutex keyLk sync.Mutex
} }
func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks { func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks {
sbc := &SectorBlocks{ sbc := &SectorBlocks{
Store: sectst, Miner: miner,
intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)), intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)),
@ -160,7 +160,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
} }
} }
func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint64) (sectorID uint64, err error) { func (st *SectorBlocks) AddUnixfsPiece(ctx context.Context, ref cid.Cid, r UnixfsReader, dealID uint64) (sectorID uint64, err error) {
size, err := r.Size() size, err := r.Size()
if err != nil { if err != nil {
return 0, err return 0, err
@ -175,7 +175,7 @@ func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint6
pr, psize := padreader.New(r, uint64(size)) pr, psize := padreader.New(r, uint64(size))
return st.Store.AddPiece(refst.pieceRef, psize, pr, dealID) return st.Miner.SealPiece(ctx, refst.pieceRef, psize, pr, dealID)
} }
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {