make sector-storage compile

This commit is contained in:
Steven Allen 2020-11-04 22:34:24 -08:00 committed by Łukasz Magiera
parent 05db5ce426
commit 584907269a
7 changed files with 159 additions and 158 deletions

View File

@ -43,7 +43,7 @@ var sectorSize, _ = sealProofType.SectorSize()
var sealRand = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}
type seal struct {
id abi.SectorID
ref storage.SectorRef
cids storage.SectorCids
pi abi.PieceInfo
ticket abi.SealRandomness
@ -56,12 +56,12 @@ func data(sn abi.SectorNumber, dlen abi.UnpaddedPieceSize) io.Reader {
)
}
func (s *seal) precommit(t *testing.T, sb *Sealer, id abi.SectorID, done func()) {
func (s *seal) precommit(t *testing.T, sb *Sealer, id storage.SectorRef, done func()) {
defer done()
dlen := abi.PaddedPieceSize(sectorSize).Unpadded()
var err error
r := data(id.Number, dlen)
r := data(id.ID.Number, dlen)
s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, r)
if err != nil {
t.Fatalf("%+v", err)
@ -84,19 +84,19 @@ func (s *seal) commit(t *testing.T, sb *Sealer, done func()) {
defer done()
seed := abi.InteractiveSealRandomness{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}
pc1, err := sb.SealCommit1(context.TODO(), s.id, s.ticket, seed, []abi.PieceInfo{s.pi}, s.cids)
pc1, err := sb.SealCommit1(context.TODO(), s.ref, s.ticket, seed, []abi.PieceInfo{s.pi}, s.cids)
if err != nil {
t.Fatalf("%+v", err)
}
proof, err := sb.SealCommit2(context.TODO(), s.id, pc1)
proof, err := sb.SealCommit2(context.TODO(), s.ref, pc1)
if err != nil {
t.Fatalf("%+v", err)
}
ok, err := ProofVerifier.VerifySeal(proof2.SealVerifyInfo{
SectorID: s.id,
SectorID: s.ref.ID,
SealedCID: s.cids.Sealed,
SealProof: sealProofType,
SealProof: s.ref.ProofType,
Proof: proof,
Randomness: s.ticket,
InteractiveRandomness: seed,
@ -111,7 +111,7 @@ func (s *seal) commit(t *testing.T, sb *Sealer, done func()) {
}
}
func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.SectorID, done func()) {
func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si storage.SectorRef, done func()) {
defer done()
var b bytes.Buffer
@ -120,7 +120,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
t.Fatal(err)
}
expect, _ := ioutil.ReadAll(data(si.Number, 1016))
expect, _ := ioutil.ReadAll(data(si.ID.Number, 1016))
if !bytes.Equal(b.Bytes(), expect) {
t.Fatal("read wrong bytes")
}
@ -150,7 +150,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
t.Fatal(err)
}
expect, _ = ioutil.ReadAll(data(si.Number, 1016))
expect, _ = ioutil.ReadAll(data(si.ID.Number, 1016))
require.Equal(t, expect, b.Bytes())
b.Reset()
@ -174,13 +174,13 @@ func post(t *testing.T, sealer *Sealer, skipped []abi.SectorID, seals ...seal) {
sis := make([]proof2.SectorInfo, len(seals))
for i, s := range seals {
sis[i] = proof2.SectorInfo{
SealProof: sealProofType,
SectorNumber: s.id.Number,
SealProof: s.ref.ProofType,
SectorNumber: s.ref.ID.Number,
SealedCID: s.cids.Sealed,
}
}
proofs, skp, err := sealer.GenerateWindowPoSt(context.TODO(), seals[0].id.Miner, sis, randomness)
proofs, skp, err := sealer.GenerateWindowPoSt(context.TODO(), seals[0].ref.ID.Miner, sis, randomness)
if len(skipped) > 0 {
require.Error(t, err)
require.EqualValues(t, skipped, skp)
@ -195,7 +195,7 @@ func post(t *testing.T, sealer *Sealer, skipped []abi.SectorID, seals ...seal) {
Randomness: randomness,
Proofs: proofs,
ChallengedSectors: sis,
Prover: seals[0].id.Miner,
Prover: seals[0].ref.ID.Miner,
})
if err != nil {
t.Fatalf("%+v", err)
@ -205,7 +205,7 @@ func post(t *testing.T, sealer *Sealer, skipped []abi.SectorID, seals ...seal) {
}
}
func corrupt(t *testing.T, sealer *Sealer, id abi.SectorID) {
func corrupt(t *testing.T, sealer *Sealer, id storage.SectorRef) {
paths, done, err := sealer.sectors.AcquireSector(context.Background(), id, storiface.FTSealed, 0, storiface.PathStorage)
require.NoError(t, err)
defer done()
@ -264,14 +264,10 @@ func TestSealAndVerify(t *testing.T) {
}
miner := abi.ActorID(123)
cfg := &Config{
SealProofType: sealProofType,
}
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp, cfg)
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
@ -286,9 +282,12 @@ func TestSealAndVerify(t *testing.T) {
}
defer cleanup()
si := abi.SectorID{Miner: miner, Number: 1}
si := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 1},
ProofType: sealProofType,
}
s := seal{id: si}
s := seal{ref: si}
start := time.Now()
@ -338,13 +337,10 @@ func TestSealPoStNoCommit(t *testing.T) {
miner := abi.ActorID(123)
cfg := &Config{
SealProofType: sealProofType,
}
sp := &basicfs.Provider{
Root: dir,
}
sb, err := New(sp, cfg)
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
@ -360,9 +356,12 @@ func TestSealPoStNoCommit(t *testing.T) {
}
defer cleanup()
si := abi.SectorID{Miner: miner, Number: 1}
si := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 1},
ProofType: sealProofType,
}
s := seal{id: si}
s := seal{ref: si}
start := time.Now()
@ -403,13 +402,10 @@ func TestSealAndVerify3(t *testing.T) {
miner := abi.ActorID(123)
cfg := &Config{
SealProofType: sealProofType,
}
sp := &basicfs.Provider{
Root: dir,
}
sb, err := New(sp, cfg)
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
@ -424,13 +420,22 @@ func TestSealAndVerify3(t *testing.T) {
var wg sync.WaitGroup
si1 := abi.SectorID{Miner: miner, Number: 1}
si2 := abi.SectorID{Miner: miner, Number: 2}
si3 := abi.SectorID{Miner: miner, Number: 3}
si1 := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 1},
ProofType: sealProofType,
}
si2 := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 2},
ProofType: sealProofType,
}
si3 := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 3},
ProofType: sealProofType,
}
s1 := seal{id: si1}
s2 := seal{id: si2}
s3 := seal{id: si3}
s1 := seal{ref: si1}
s2 := seal{ref: si2}
s3 := seal{ref: si3}
wg.Add(3)
go s1.precommit(t, sb, si1, wg.Done) //nolint: staticcheck
@ -451,7 +456,7 @@ func TestSealAndVerify3(t *testing.T) {
corrupt(t, sb, si1)
corrupt(t, sb, si2)
post(t, sb, []abi.SectorID{si1, si2}, s1, s2, s3)
post(t, sb, []abi.SectorID{si1.ID, si2.ID}, s1, s2, s3)
}
func BenchmarkWriteWithAlignment(b *testing.B) {

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
@ -90,28 +91,23 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
st := newTestStorage(t)
si := stores.NewIndex()
cfg := &ffiwrapper.Config{
SealProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
lstor, err := stores.NewLocal(ctx, st, si, nil)
require.NoError(t, err)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, spt: cfg.SealProofType}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
require.NoError(t, err)
stor := stores.NewRemote(lstor, si, nil, 6000)
m := &Manager{
scfg: cfg,
ls: st,
storage: stor,
localStore: lstor,
remoteHnd: &stores.FetchHandler{Local: lstor},
index: si,
sched: newScheduler(cfg.SealProofType),
sched: newScheduler(),
Prover: prover,
@ -141,12 +137,14 @@ func TestSimple(t *testing.T) {
}
err := m.AddWorker(ctx, newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m))
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
@ -176,14 +174,16 @@ func TestRedoPC1(t *testing.T) {
}
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
@ -228,14 +228,16 @@ func TestRestartManager(t *testing.T) {
}
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
@ -329,14 +331,16 @@ func TestRestartWorker(t *testing.T) {
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
SealProof: 0,
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
apDone := make(chan struct{})
@ -363,7 +367,6 @@ func TestRestartWorker(t *testing.T) {
w = newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
SealProof: 0,
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
@ -400,7 +403,6 @@ func TestReenableWorker(t *testing.T) {
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
SealProof: 0,
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))

View File

@ -27,21 +27,14 @@ var log = logging.Logger("sbmock")
type SectorMgr struct {
sectors map[abi.SectorID]*sectorState
pieces map[cid.Cid][]byte
sectorSize abi.SectorSize
nextSectorID abi.SectorNumber
proofType abi.RegisteredSealProof
lk sync.Mutex
}
type mockVerif struct{}
func NewMockSectorMgr(ssize abi.SectorSize, genesisSectors []abi.SectorID) *SectorMgr {
rt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
if err != nil {
panic(err)
}
func NewMockSectorMgr(genesisSectors []abi.SectorID) *SectorMgr {
sectors := make(map[abi.SectorID]*sectorState)
for _, sid := range genesisSectors {
sectors[sid] = &sectorState{
@ -53,9 +46,7 @@ func NewMockSectorMgr(ssize abi.SectorSize, genesisSectors []abi.SectorID) *Sect
return &SectorMgr{
sectors: sectors,
pieces: map[cid.Cid][]byte{},
sectorSize: ssize,
nextSectorID: 5,
proofType: rt,
}
}
@ -75,17 +66,17 @@ type sectorState struct {
lk sync.Mutex
}
func (mgr *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error {
func (mgr *SectorMgr) NewSector(ctx context.Context, sector storage.SectorRef) error {
return nil
}
func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorID, size, mgr.proofType)
func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorID, size, sectorID.ProofType)
var b bytes.Buffer
tr := io.TeeReader(r, &b)
c, err := ffiwrapper.GeneratePieceCIDFromFile(mgr.proofType, tr, size)
c, err := ffiwrapper.GeneratePieceCIDFromFile(sectorID.ProofType, tr, size)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err)
}
@ -95,12 +86,12 @@ func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID abi.SectorID, exist
mgr.lk.Lock()
mgr.pieces[c] = b.Bytes()
ss, ok := mgr.sectors[sectorID]
ss, ok := mgr.sectors[sectorID.ID]
if !ok {
ss = &sectorState{
state: statePacking,
}
mgr.sectors[sectorID] = ss
mgr.sectors[sectorID.ID] = ss
}
mgr.lk.Unlock()
@ -115,10 +106,6 @@ func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID abi.SectorID, exist
}, nil
}
func (mgr *SectorMgr) SectorSize() abi.SectorSize {
return mgr.sectorSize
}
func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
mgr.lk.Lock()
defer mgr.lk.Unlock()
@ -127,9 +114,9 @@ func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
return id, nil
}
func (mgr *SectorMgr) ForceState(sid abi.SectorID, st int) error {
func (mgr *SectorMgr) ForceState(sid storage.SectorRef, st int) error {
mgr.lk.Lock()
ss, ok := mgr.sectors[sid]
ss, ok := mgr.sectors[sid.ID]
mgr.lk.Unlock()
if !ok {
return xerrors.Errorf("no sector with id %d in storage", sid)
@ -140,18 +127,23 @@ func (mgr *SectorMgr) ForceState(sid abi.SectorID, st int) error {
return nil
}
func (mgr *SectorMgr) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
func (mgr *SectorMgr) SealPreCommit1(ctx context.Context, sid storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
mgr.lk.Lock()
ss, ok := mgr.sectors[sid]
ss, ok := mgr.sectors[sid.ID]
mgr.lk.Unlock()
if !ok {
return nil, xerrors.Errorf("no sector with id %d in storage", sid)
}
ssize, err := sid.ProofType.SectorSize()
if err != nil {
return nil, xerrors.Errorf("failed to get proof sector size: %w", err)
}
ss.lk.Lock()
defer ss.lk.Unlock()
ussize := abi.PaddedPieceSize(mgr.sectorSize).Unpadded()
ussize := abi.PaddedPieceSize(ssize).Unpadded()
// TODO: verify pieces in sinfo.pieces match passed in pieces
@ -180,7 +172,7 @@ func (mgr *SectorMgr) SealPreCommit1(ctx context.Context, sid abi.SectorID, tick
}
}
commd, err := MockVerifier.GenerateDataCommitment(mgr.proofType, pis)
commd, err := MockVerifier.GenerateDataCommitment(sid.ProofType, pis)
if err != nil {
return nil, err
}
@ -195,7 +187,7 @@ func (mgr *SectorMgr) SealPreCommit1(ctx context.Context, sid abi.SectorID, tick
return cc, nil
}
func (mgr *SectorMgr) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
func (mgr *SectorMgr) SealPreCommit2(ctx context.Context, sid storage.SectorRef, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
db := []byte(string(phase1Out))
db[0] ^= 'd'
@ -214,9 +206,9 @@ func (mgr *SectorMgr) SealPreCommit2(ctx context.Context, sid abi.SectorID, phas
}, nil
}
func (mgr *SectorMgr) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
func (mgr *SectorMgr) SealCommit1(ctx context.Context, sid storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
mgr.lk.Lock()
ss, ok := mgr.sectors[sid]
ss, ok := mgr.sectors[sid.ID]
mgr.lk.Unlock()
if !ok {
return nil, xerrors.Errorf("no such sector %d", sid)
@ -236,16 +228,16 @@ func (mgr *SectorMgr) SealCommit1(ctx context.Context, sid abi.SectorID, ticket
var out [32]byte
for i := range out {
out[i] = cids.Unsealed.Bytes()[i] + cids.Sealed.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid.Number&0xff)
out[i] = cids.Unsealed.Bytes()[i] + cids.Sealed.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid.ID.Number&0xff)
}
return out[:], nil
}
func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid storage.SectorRef, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
var out [1920]byte
for i := range out[:len(phase1Out)] {
out[i] = phase1Out[i] ^ byte(sid.Number&0xff)
out[i] = phase1Out[i] ^ byte(sid.ID.Number&0xff)
}
return out[:], nil
@ -253,10 +245,10 @@ func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1O
// Test Instrumentation Methods
func (mgr *SectorMgr) MarkFailed(sid abi.SectorID, failed bool) error {
func (mgr *SectorMgr) MarkFailed(sid storage.SectorRef, failed bool) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
ss, ok := mgr.sectors[sid]
ss, ok := mgr.sectors[sid.ID]
if !ok {
return fmt.Errorf("no such sector in storage")
}
@ -265,10 +257,10 @@ func (mgr *SectorMgr) MarkFailed(sid abi.SectorID, failed bool) error {
return nil
}
func (mgr *SectorMgr) MarkCorrupted(sid abi.SectorID, corrupted bool) error {
func (mgr *SectorMgr) MarkCorrupted(sid storage.SectorRef, corrupted bool) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
ss, ok := mgr.sectors[sid]
ss, ok := mgr.sectors[sid.ID]
if !ok {
return fmt.Errorf("no such sector in storage")
}
@ -353,55 +345,62 @@ func generateFakePoSt(sectorInfo []proof2.SectorInfo, rpt func(abi.RegisteredSea
}
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
if len(mgr.sectors[sectorID].pieces) > 1 || offset != 0 {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
if len(mgr.sectors[sectorID.ID].pieces) > 1 || offset != 0 {
panic("implme")
}
_, err := io.CopyN(w, bytes.NewReader(mgr.pieces[mgr.sectors[sectorID].pieces[0]]), int64(size))
_, err := io.CopyN(w, bytes.NewReader(mgr.pieces[mgr.sectors[sectorID.ID].pieces[0]]), int64(size))
return err
}
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) {
usize := abi.PaddedPieceSize(mgr.sectorSize).Unpadded()
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
psize, err := spt.SectorSize()
if err != nil {
return storage.SectorRef{}, nil, err
}
usize := abi.PaddedPieceSize(psize).Unpadded()
sid, err := mgr.AcquireSectorNumber()
if err != nil {
return abi.SectorID{}, nil, err
return storage.SectorRef{}, nil, err
}
buf := make([]byte, usize)
_, _ = rand.Read(buf) // nolint:gosec
id := abi.SectorID{
Miner: mid,
Number: sid,
id := storage.SectorRef{
ID: abi.SectorID{
Miner: mid,
Number: sid,
},
ProofType: spt,
}
pi, err := mgr.AddPiece(context.TODO(), id, nil, usize, bytes.NewReader(buf))
if err != nil {
return abi.SectorID{}, nil, err
return storage.SectorRef{}, nil, err
}
return id, []abi.PieceInfo{pi}, nil
}
func (mgr *SectorMgr) FinalizeSector(context.Context, abi.SectorID, []storage.Range) error {
func (mgr *SectorMgr) FinalizeSector(context.Context, storage.SectorRef, []storage.Range) error {
return nil
}
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
return nil
}
func (mgr *SectorMgr) Remove(ctx context.Context, sector abi.SectorID) error {
func (mgr *SectorMgr) Remove(ctx context.Context, sector storage.SectorRef) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
if _, has := mgr.sectors[sector]; !has {
if _, has := mgr.sectors[sector.ID]; !has {
return xerrors.Errorf("sector not found")
}
delete(mgr.sectors, sector)
delete(mgr.sectors, sector.ID)
return nil
}

View File

@ -9,9 +9,9 @@ import (
)
func TestOpFinish(t *testing.T) {
sb := NewMockSectorMgr(2048, nil)
sb := NewMockSectorMgr(nil)
sid, pieces, err := sb.StageFakeData(123)
sid, pieces, err := sb.StageFakeData(123, abi.RegisteredSealProof_StackedDrg2KiBV1_1)
if err != nil {
t.Fatal(err)
}

View File

@ -47,55 +47,55 @@ type schedTestWorker struct {
session uuid.UUID
}
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
func (s *schedTestWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
func (s *schedTestWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) {
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) Remove(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
func (s *schedTestWorker) Remove(ctx context.Context, sector storage.SectorRef) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) NewSector(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
func (s *schedTestWorker) NewSector(ctx context.Context, sector storage.SectorRef) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
func (s *schedTestWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
func (s *schedTestWorker) Fetch(ctx context.Context, id storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
func (s *schedTestWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
panic("implement me")
}
@ -165,8 +165,7 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
}
func TestSchedStartStop(t *testing.T) {
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
sched := newScheduler(spt)
sched := newScheduler()
go sched.runSched()
addTestWorker(t, sched, stores.NewIndex(), "fred", nil)
@ -211,12 +210,15 @@ func TestSched(t *testing.T) {
go func() {
defer rm.wg.Done()
sectorNum := abi.SectorID{
Miner: 8,
Number: sid,
sectorRef := storage.SectorRef{
ID: abi.SectorID{
Miner: 8,
Number: sid,
},
ProofType: spt,
}
err := sched.Schedule(ctx, sectorNum, taskType, sel, func(ctx context.Context, w Worker) error {
err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)
@ -286,7 +288,7 @@ func TestSched(t *testing.T) {
return func(t *testing.T) {
index := stores.NewIndex()
sched := newScheduler(spt)
sched := newScheduler()
sched.testSync = make(chan struct{})
go sched.runSched()
@ -518,7 +520,6 @@ func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b
var _ WorkerSelector = slowishSelector(true)
func BenchmarkTrySched(b *testing.B) {
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
logging.SetAllLoggers(logging.LevelInfo)
defer logging.SetAllLoggers(logging.LevelDebug)
ctx := context.Background()
@ -528,7 +529,7 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
sched := newScheduler(spt)
sched := newScheduler()
sched.workers[WorkerID{}] = &workerHandle{
workerRpc: nil,
info: storiface.WorkerInfo{
@ -568,9 +569,8 @@ func BenchmarkTrySched(b *testing.B) {
}
func TestWindowCompact(t *testing.T) {
sh := scheduler{
spt: abi.RegisteredSealProof_StackedDrg32GiBV1,
}
sh := scheduler{}
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) {
return func(t *testing.T) {
@ -585,7 +585,7 @@ func TestWindowCompact(t *testing.T) {
for _, task := range windowTasks {
window.todo = append(window.todo, &workerRequest{taskType: task})
window.allocated.add(wh.info.Resources, ResourceTable[task][sh.spt])
window.allocated.add(wh.info.Resources, ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -604,7 +604,7 @@ func TestWindowCompact(t *testing.T) {
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(wh.info.Resources, ResourceTable[task][sh.spt])
expectRes.add(wh.info.Resources, ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)

View File

@ -31,50 +31,50 @@ func (t *testExec) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID,
panic("implement me")
}
func (t *testExec) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
func (t *testExec) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
panic("implement me")
}
func (t *testExec) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
func (t *testExec) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
panic("implement me")
}
func (t *testExec) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
func (t *testExec) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
panic("implement me")
}
func (t *testExec) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
func (t *testExec) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storage.Proof, error) {
panic("implement me")
}
func (t *testExec) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
func (t *testExec) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
panic("implement me")
}
func (t *testExec) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
func (t *testExec) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
panic("implement me")
}
func (t *testExec) Remove(ctx context.Context, sector abi.SectorID) error {
func (t *testExec) Remove(ctx context.Context, sector storage.SectorRef) error {
panic("implement me")
}
func (t *testExec) NewSector(ctx context.Context, sector abi.SectorID) error {
func (t *testExec) NewSector(ctx context.Context, sector storage.SectorRef) error {
panic("implement me")
}
func (t *testExec) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
func (t *testExec) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
resp := make(chan apres)
t.apch <- resp
ar := <-resp
return ar.pi, ar.err
}
func (t *testExec) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
func (t *testExec) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
panic("implement me")
}
func (t *testExec) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
func (t *testExec) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
panic("implement me")
}

View File

@ -31,11 +31,6 @@ type testWorker struct {
}
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
ssize, err := wcfg.SealProof.SectorSize()
if err != nil {
panic(err)
}
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
@ -46,15 +41,15 @@ func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerR
lstor: lstor,
ret: ret,
mockSeal: mock.NewMockSectorMgr(ssize, nil),
mockSeal: mock.NewMockSectorMgr(nil),
session: uuid.New(),
}
}
func (t *testWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallID)) (storiface.CallID, error) {
func (t *testWorker) asyncCall(sector storage.SectorRef, work func(ci storiface.CallID)) (storiface.CallID, error) {
ci := storiface.CallID{
Sector: sector,
Sector: sector.ID,
ID: uuid.New(),
}
@ -63,7 +58,7 @@ func (t *testWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallI
return ci, nil
}
func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
if err := t.ret.ReturnAddPiece(ctx, ci, p, errstr(err)); err != nil {
@ -72,7 +67,7 @@ func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSiz
})
}
func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
t.pc1s++
@ -90,7 +85,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ti
})
}
func (t *testWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
func (t *testWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil {
log.Error(err)