diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index 558906963..6bd212c78 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -188,7 +188,7 @@ func main() { log.Info("Running replication...") pieces := []sectorbuilder.PublicPieceInfo{pi} - pco, err := sb.SealPreCommit(i, ticket, pieces) + pco, err := sb.SealPreCommit(context.TODO(), i, ticket, pieces) if err != nil { return xerrors.Errorf("commit: %w", err) } @@ -206,7 +206,7 @@ func main() { } log.Info("Generating PoRep for sector") - proof, err := sb.SealCommit(i, ticket, seed, pieces, pco) + proof, err := sb.SealCommit(context.TODO(), i, ticket, seed, pieces, pco) if err != nil { return err } diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index d79a8ed83..1043aa5ea 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -103,7 +103,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) switch task.Type { case sectorbuilder.WorkerPreCommit: - rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces) + rspco, err := w.sb.SealPreCommit(ctx, task.SectorID, task.SealTicket, task.Pieces) if err != nil { return errRes(xerrors.Errorf("precomitting: %w", err)) } @@ -117,7 +117,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return errRes(xerrors.Errorf("pushing precommited data: %w", err)) } case sectorbuilder.WorkerCommit: - proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco) + proof, err := w.sb.SealCommit(ctx, task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco) if err != nil { return errRes(xerrors.Errorf("comitting: %w", err)) } diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index 0b62c38bb..42f864785 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -69,7 +69,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb fmt.Printf("sector-id: %d, piece info: %v", sid, pi) - pco, err := sb.SealPreCommit(sid, ticket, []sectorbuilder.PublicPieceInfo{pi}) + pco, err := sb.SealPreCommit(context.TODO(), sid, ticket, []sectorbuilder.PublicPieceInfo{pi}) if err != nil { return nil, xerrors.Errorf("commit: %w", err) } diff --git a/go.mod b/go.mod index 0c6325b02..ee7c4d175 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce github.com/filecoin-project/go-paramfetch v0.0.1 - github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107220006-3361d30ea5ab + github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254 github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect @@ -47,7 +47,7 @@ require ( github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-cbor v0.0.3 github.com/ipfs/go-ipld-format v0.0.2 - github.com/ipfs/go-log v1.0.1 // indirect + github.com/ipfs/go-log v1.0.1 github.com/ipfs/go-log/v2 v2.0.2 github.com/ipfs/go-merkledag v0.2.4 github.com/ipfs/go-path v0.0.7 diff --git a/go.sum b/go.sum index 5fd3476af..b31495db1 100644 --- a/go.sum +++ b/go.sum @@ -120,6 +120,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyC github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107220006-3361d30ea5ab h1:bsrBNO1LwnhOLxPEXlSPal/WuY61mLJUCHYyD0NayHg= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107220006-3361d30ea5ab/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= +github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254 h1:4IvlPad82JaNBtqh8fEAUIKWv8I3tguAJjGvUyHNZS4= +github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 h1:NZXq90YlfakSmB2/84dGr0AVmKYFA97+yyViBIgTFbk= github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -283,12 +285,12 @@ github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9 github.com/ipfs/go-log v1.0.0 h1:BW3LQIiZzpNyolt84yvKNCd3FU+AK4VDw1hnHR+1aiI= github.com/ipfs/go-log v1.0.0/go.mod h1:JO7RzlMK6rA+CIxFMLOuB6Wf5b81GDiKElL7UPSIKjA= github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo= -github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I= -github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM= -github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo= github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I= +github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I= github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM= +github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM= +github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log/v2 v2.0.2 h1:xguurydRdfKMJjKyxNXNU8lYP0VZH1NUwJRwUorjuEw= github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= @@ -879,9 +881,9 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191216173652-a0e659d51361 h1:RIIXAeV6GvDBuADKumTODatUqANFZ+5BPMnzsy4hulY= golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200108195415-316d2f248479 h1:csuS+MHeEA2eWhyjQCMaPMq4z1+/PohkBSjJZHSIbOE= golang.org/x/tools v0.0.0-20200108195415-316d2f248479/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/storage/miner.go b/storage/miner.go index ed2b8876f..90c4d9fac 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -79,8 +79,8 @@ type SectorBuilder interface { AcquireSectorId() (uint64, error) Scrub(sectorbuilder.SortedPublicSectorInfo) []*sectorbuilder.Fault GenerateFallbackPoSt(sectorbuilder.SortedPublicSectorInfo, [sectorbuilder.CommLen]byte, []uint64) ([]sectorbuilder.EPostCandidate, []byte, error) - SealPreCommit(uint64, sectorbuilder.SealTicket, []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error) - SealCommit(uint64, sectorbuilder.SealTicket, sectorbuilder.SealSeed, []sectorbuilder.PublicPieceInfo, sectorbuilder.RawSealPreCommitOutput) ([]byte, error) + SealPreCommit(context.Context, uint64, sectorbuilder.SealTicket, []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error) + SealCommit(context.Context, uint64, sectorbuilder.SealTicket, sectorbuilder.SealSeed, []sectorbuilder.PublicPieceInfo, sectorbuilder.RawSealPreCommitOutput) ([]byte, error) // Not so sure about these being on the interface GetPath(string, string) (string, error) diff --git a/storage/sbmock/sbmock.go b/storage/sbmock/sbmock.go index 45f83d76d..73905db28 100644 --- a/storage/sbmock/sbmock.go +++ b/storage/sbmock/sbmock.go @@ -1,13 +1,13 @@ package sbmock import ( + "bytes" "context" "fmt" "io" "io/ioutil" "math/rand" "sync" - "time" "github.com/filecoin-project/go-sectorbuilder" "golang.org/x/xerrors" @@ -25,9 +25,6 @@ type SBMock struct { nextSectorID uint64 rateLimit chan struct{} - preCommitDelay time.Duration - commitDelay time.Duration - lk sync.Mutex } @@ -102,14 +99,34 @@ func (sb *SBMock) AcquireSectorId() (uint64, error) { } func (sb *SBMock) Scrub(sectorbuilder.SortedPublicSectorInfo) []*sectorbuilder.Fault { - return nil + sb.lk.Lock() + mcopy := make(map[uint64]*sectorState) + for k, v := range sb.sectors { + mcopy[k] = v + } + sb.lk.Unlock() + + var out []*sectorbuilder.Fault + for sid, ss := range mcopy { + ss.lk.Lock() + if ss.failed { + out = append(out, §orbuilder.Fault{ + SectorID: sid, + Err: fmt.Errorf("mock sector failed"), + }) + + } + ss.lk.Unlock() + } + + return out } func (sb *SBMock) GenerateFallbackPoSt(sectorbuilder.SortedPublicSectorInfo, [sectorbuilder.CommLen]byte, []uint64) ([]sectorbuilder.EPostCandidate, []byte, error) { panic("NYI") } -func (sb *SBMock) SealPreCommit(sid uint64, ticket sectorbuilder.SealTicket, pieces []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error) { +func (sb *SBMock) SealPreCommit(ctx context.Context, sid uint64, ticket sectorbuilder.SealTicket, pieces []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error) { sb.lk.Lock() ss, ok := sb.sectors[sid] sb.lk.Unlock() @@ -137,7 +154,7 @@ func (sb *SBMock) SealPreCommit(sid uint64, ticket sectorbuilder.SealTicket, pie return sectorbuilder.RawSealPreCommitOutput{}, xerrors.Errorf("cannot call pre-seal on sector not in 'packing' state") } - time.Sleep(sb.preCommitDelay) + opFinishWait(ctx) ss.state = statePreCommit @@ -147,7 +164,7 @@ func (sb *SBMock) SealPreCommit(sid uint64, ticket sectorbuilder.SealTicket, pie }, nil } -func (sb *SBMock) SealCommit(sid uint64, ticket sectorbuilder.SealTicket, seed sectorbuilder.SealSeed, pieces []sectorbuilder.PublicPieceInfo, precommit sectorbuilder.RawSealPreCommitOutput) ([]byte, error) { +func (sb *SBMock) SealCommit(ctx context.Context, sid uint64, ticket sectorbuilder.SealTicket, seed sectorbuilder.SealSeed, pieces []sectorbuilder.PublicPieceInfo, precommit sectorbuilder.RawSealPreCommitOutput) ([]byte, error) { sb.lk.Lock() ss, ok := sb.sectors[sid] sb.lk.Unlock() @@ -165,7 +182,7 @@ func (sb *SBMock) SealCommit(sid uint64, ticket sectorbuilder.SealTicket, seed s return nil, xerrors.Errorf("cannot commit sector that has not been precommitted") } - time.Sleep(sb.commitDelay) + opFinishWait(ctx) buf := make([]byte, 32) rand.Read(buf) @@ -202,14 +219,36 @@ func (sb *SBMock) FailSector(sid uint64) error { return nil } -func (sb *SBMock) SetPreCommitDelay(d time.Duration) { - sb.lk.Lock() - defer sb.lk.Unlock() - sb.preCommitDelay = d +func opFinishWait(ctx context.Context) { + val, ok := ctx.Value("opfinish").(chan struct{}) + if !ok { + return + } + <-val } -func (sb *SBMock) SetCommitDelay(d time.Duration) { - sb.lk.Lock() - defer sb.lk.Unlock() - sb.commitDelay = d +func AddOpFinish(ctx context.Context) (context.Context, func()) { + done := make(chan struct{}) + + return context.WithValue(ctx, "opfinish", done), func() { + close(done) + } +} + +func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, error) { + usize := sectorbuilder.UserBytesForSectorSize(sb.sectorSize) + sid, err := sb.AcquireSectorId() + if err != nil { + return 0, nil, err + } + + buf := make([]byte, usize) + rand.Read(buf) + + pi, err := sb.AddPiece(usize, sid, bytes.NewReader(buf), nil) + if err != nil { + return 0, nil, err + } + + return sid, []sectorbuilder.PublicPieceInfo{pi}, nil } diff --git a/storage/sbmock/sbmock_test.go b/storage/sbmock/sbmock_test.go new file mode 100644 index 000000000..d07a4eee4 --- /dev/null +++ b/storage/sbmock/sbmock_test.go @@ -0,0 +1,45 @@ +package sbmock + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-sectorbuilder" +) + +func TestOpFinish(t *testing.T) { + sb := NewMockSectorBuilder(1, 1024) + + sid, pieces, err := sb.StageFakeData() + if err != nil { + t.Fatal(err) + } + + ctx, done := AddOpFinish(context.TODO()) + + finished := make(chan struct{}) + go func() { + _, err := sb.SealPreCommit(ctx, sid, sectorbuilder.SealTicket{}, pieces) + if err != nil { + t.Error(err) + return + } + + close(finished) + }() + + select { + case <-finished: + t.Fatal("should not finish until we tell it to") + case <-time.After(time.Second / 2): + } + + done() + + select { + case <-finished: + case <-time.After(time.Second / 2): + t.Fatal("should finish after we tell it to") + } +} diff --git a/storage/sector_states.go b/storage/sector_states.go index 0b86a3059..f3c2a4b44 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -69,7 +69,7 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp return sector.upd().fatal(err) } - rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos()) + rspco, err := m.sb.SealPreCommit(ctx, sector.SectorID, *ticket, sector.pieceInfos()) if err != nil { return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err)) } @@ -173,7 +173,7 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate { log.Info("scheduling seal proof computation...") - proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) + proof, err := m.sb.SealCommit(ctx, sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) if err != nil { return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err)) }