add ability to control seal operations completion time

This commit is contained in:
whyrusleeping 2020-01-09 12:50:45 -08:00
parent bda85e4097
commit 7534ac1e1a
9 changed files with 118 additions and 32 deletions

View File

@ -188,7 +188,7 @@ func main() {
log.Info("Running replication...")
pieces := []sectorbuilder.PublicPieceInfo{pi}
pco, err := sb.SealPreCommit(i, ticket, pieces)
pco, err := sb.SealPreCommit(context.TODO(), i, ticket, pieces)
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
@ -206,7 +206,7 @@ func main() {
}
log.Info("Generating PoRep for sector")
proof, err := sb.SealCommit(i, ticket, seed, pieces, pco)
proof, err := sb.SealCommit(context.TODO(), i, ticket, seed, pieces, pco)
if err != nil {
return err
}

View File

@ -103,7 +103,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
switch task.Type {
case sectorbuilder.WorkerPreCommit:
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces)
rspco, err := w.sb.SealPreCommit(ctx, task.SectorID, task.SealTicket, task.Pieces)
if err != nil {
return errRes(xerrors.Errorf("precomitting: %w", err))
}
@ -117,7 +117,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
case sectorbuilder.WorkerCommit:
proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
proof, err := w.sb.SealCommit(ctx, task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
if err != nil {
return errRes(xerrors.Errorf("comitting: %w", err))
}

View File

@ -69,7 +69,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
fmt.Printf("sector-id: %d, piece info: %v", sid, pi)
pco, err := sb.SealPreCommit(sid, ticket, []sectorbuilder.PublicPieceInfo{pi})
pco, err := sb.SealPreCommit(context.TODO(), sid, ticket, []sectorbuilder.PublicPieceInfo{pi})
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}

4
go.mod
View File

@ -17,7 +17,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce
github.com/filecoin-project/go-paramfetch v0.0.1
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107220006-3361d30ea5ab
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254
github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-ole/go-ole v1.2.4 // indirect
@ -47,7 +47,7 @@ require (
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipld-cbor v0.0.3
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-log v1.0.1 // indirect
github.com/ipfs/go-log v1.0.1
github.com/ipfs/go-log/v2 v2.0.2
github.com/ipfs/go-merkledag v0.2.4
github.com/ipfs/go-path v0.0.7

10
go.sum
View File

@ -120,6 +120,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyC
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107220006-3361d30ea5ab h1:bsrBNO1LwnhOLxPEXlSPal/WuY61mLJUCHYyD0NayHg=
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107220006-3361d30ea5ab/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254 h1:4IvlPad82JaNBtqh8fEAUIKWv8I3tguAJjGvUyHNZS4=
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 h1:NZXq90YlfakSmB2/84dGr0AVmKYFA97+yyViBIgTFbk=
github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@ -283,12 +285,12 @@ github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9
github.com/ipfs/go-log v1.0.0 h1:BW3LQIiZzpNyolt84yvKNCd3FU+AK4VDw1hnHR+1aiI=
github.com/ipfs/go-log v1.0.0/go.mod h1:JO7RzlMK6rA+CIxFMLOuB6Wf5b81GDiKElL7UPSIKjA=
github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo=
github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I=
github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM=
github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log v1.0.1 h1:5lIEEOQTk/vd1WuPFBRqz2mcp+5G1fMVcW+Ib/H5Hfo=
github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I=
github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I=
github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM=
github.com/ipfs/go-log/v2 v2.0.1 h1:mnR9XFltezAtO8A6tj5U7nKkRzhEQNEw/wT11U2HhPM=
github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.2 h1:xguurydRdfKMJjKyxNXNU8lYP0VZH1NUwJRwUorjuEw=
github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
@ -879,9 +881,9 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361 h1:RIIXAeV6GvDBuADKumTODatUqANFZ+5BPMnzsy4hulY=
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200108195415-316d2f248479 h1:csuS+MHeEA2eWhyjQCMaPMq4z1+/PohkBSjJZHSIbOE=
golang.org/x/tools v0.0.0-20200108195415-316d2f248479/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -79,8 +79,8 @@ type SectorBuilder interface {
AcquireSectorId() (uint64, error)
Scrub(sectorbuilder.SortedPublicSectorInfo) []*sectorbuilder.Fault
GenerateFallbackPoSt(sectorbuilder.SortedPublicSectorInfo, [sectorbuilder.CommLen]byte, []uint64) ([]sectorbuilder.EPostCandidate, []byte, error)
SealPreCommit(uint64, sectorbuilder.SealTicket, []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error)
SealCommit(uint64, sectorbuilder.SealTicket, sectorbuilder.SealSeed, []sectorbuilder.PublicPieceInfo, sectorbuilder.RawSealPreCommitOutput) ([]byte, error)
SealPreCommit(context.Context, uint64, sectorbuilder.SealTicket, []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error)
SealCommit(context.Context, uint64, sectorbuilder.SealTicket, sectorbuilder.SealSeed, []sectorbuilder.PublicPieceInfo, sectorbuilder.RawSealPreCommitOutput) ([]byte, error)
// Not so sure about these being on the interface
GetPath(string, string) (string, error)

View File

@ -1,13 +1,13 @@
package sbmock
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sync"
"time"
"github.com/filecoin-project/go-sectorbuilder"
"golang.org/x/xerrors"
@ -25,9 +25,6 @@ type SBMock struct {
nextSectorID uint64
rateLimit chan struct{}
preCommitDelay time.Duration
commitDelay time.Duration
lk sync.Mutex
}
@ -102,14 +99,34 @@ func (sb *SBMock) AcquireSectorId() (uint64, error) {
}
func (sb *SBMock) Scrub(sectorbuilder.SortedPublicSectorInfo) []*sectorbuilder.Fault {
return nil
sb.lk.Lock()
mcopy := make(map[uint64]*sectorState)
for k, v := range sb.sectors {
mcopy[k] = v
}
sb.lk.Unlock()
var out []*sectorbuilder.Fault
for sid, ss := range mcopy {
ss.lk.Lock()
if ss.failed {
out = append(out, &sectorbuilder.Fault{
SectorID: sid,
Err: fmt.Errorf("mock sector failed"),
})
}
ss.lk.Unlock()
}
return out
}
func (sb *SBMock) GenerateFallbackPoSt(sectorbuilder.SortedPublicSectorInfo, [sectorbuilder.CommLen]byte, []uint64) ([]sectorbuilder.EPostCandidate, []byte, error) {
panic("NYI")
}
func (sb *SBMock) SealPreCommit(sid uint64, ticket sectorbuilder.SealTicket, pieces []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error) {
func (sb *SBMock) SealPreCommit(ctx context.Context, sid uint64, ticket sectorbuilder.SealTicket, pieces []sectorbuilder.PublicPieceInfo) (sectorbuilder.RawSealPreCommitOutput, error) {
sb.lk.Lock()
ss, ok := sb.sectors[sid]
sb.lk.Unlock()
@ -137,7 +154,7 @@ func (sb *SBMock) SealPreCommit(sid uint64, ticket sectorbuilder.SealTicket, pie
return sectorbuilder.RawSealPreCommitOutput{}, xerrors.Errorf("cannot call pre-seal on sector not in 'packing' state")
}
time.Sleep(sb.preCommitDelay)
opFinishWait(ctx)
ss.state = statePreCommit
@ -147,7 +164,7 @@ func (sb *SBMock) SealPreCommit(sid uint64, ticket sectorbuilder.SealTicket, pie
}, nil
}
func (sb *SBMock) SealCommit(sid uint64, ticket sectorbuilder.SealTicket, seed sectorbuilder.SealSeed, pieces []sectorbuilder.PublicPieceInfo, precommit sectorbuilder.RawSealPreCommitOutput) ([]byte, error) {
func (sb *SBMock) SealCommit(ctx context.Context, sid uint64, ticket sectorbuilder.SealTicket, seed sectorbuilder.SealSeed, pieces []sectorbuilder.PublicPieceInfo, precommit sectorbuilder.RawSealPreCommitOutput) ([]byte, error) {
sb.lk.Lock()
ss, ok := sb.sectors[sid]
sb.lk.Unlock()
@ -165,7 +182,7 @@ func (sb *SBMock) SealCommit(sid uint64, ticket sectorbuilder.SealTicket, seed s
return nil, xerrors.Errorf("cannot commit sector that has not been precommitted")
}
time.Sleep(sb.commitDelay)
opFinishWait(ctx)
buf := make([]byte, 32)
rand.Read(buf)
@ -202,14 +219,36 @@ func (sb *SBMock) FailSector(sid uint64) error {
return nil
}
func (sb *SBMock) SetPreCommitDelay(d time.Duration) {
sb.lk.Lock()
defer sb.lk.Unlock()
sb.preCommitDelay = d
func opFinishWait(ctx context.Context) {
val, ok := ctx.Value("opfinish").(chan struct{})
if !ok {
return
}
<-val
}
func (sb *SBMock) SetCommitDelay(d time.Duration) {
sb.lk.Lock()
defer sb.lk.Unlock()
sb.commitDelay = d
func AddOpFinish(ctx context.Context) (context.Context, func()) {
done := make(chan struct{})
return context.WithValue(ctx, "opfinish", done), func() {
close(done)
}
}
func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, error) {
usize := sectorbuilder.UserBytesForSectorSize(sb.sectorSize)
sid, err := sb.AcquireSectorId()
if err != nil {
return 0, nil, err
}
buf := make([]byte, usize)
rand.Read(buf)
pi, err := sb.AddPiece(usize, sid, bytes.NewReader(buf), nil)
if err != nil {
return 0, nil, err
}
return sid, []sectorbuilder.PublicPieceInfo{pi}, nil
}

View File

@ -0,0 +1,45 @@
package sbmock
import (
"context"
"testing"
"time"
"github.com/filecoin-project/go-sectorbuilder"
)
func TestOpFinish(t *testing.T) {
sb := NewMockSectorBuilder(1, 1024)
sid, pieces, err := sb.StageFakeData()
if err != nil {
t.Fatal(err)
}
ctx, done := AddOpFinish(context.TODO())
finished := make(chan struct{})
go func() {
_, err := sb.SealPreCommit(ctx, sid, sectorbuilder.SealTicket{}, pieces)
if err != nil {
t.Error(err)
return
}
close(finished)
}()
select {
case <-finished:
t.Fatal("should not finish until we tell it to")
case <-time.After(time.Second / 2):
}
done()
select {
case <-finished:
case <-time.After(time.Second / 2):
t.Fatal("should finish after we tell it to")
}
}

View File

@ -69,7 +69,7 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp
return sector.upd().fatal(err)
}
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
rspco, err := m.sb.SealPreCommit(ctx, sector.SectorID, *ticket, sector.pieceInfos())
if err != nil {
return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
}
@ -173,7 +173,7 @@ func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sect
func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Info("scheduling seal proof computation...")
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
proof, err := m.sb.SealCommit(ctx, sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil {
return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
}