Merge pull request #3350 from filecoin-project/fix/some-fsm-issues

Fix some fsm issues
This commit is contained in:
Łukasz Magiera 2020-08-29 02:16:59 +02:00 committed by GitHub
commit 953effcc43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 866 additions and 221 deletions

View File

@ -325,7 +325,7 @@ type FullNode interface {
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
// StateSectorPreCommitInfo returns the PreCommit info for the specified miner's sector
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
// StateSectorGetInfo returns the on-chain info for the specified miner's sector
// StateSectorGetInfo returns the on-chain info for the specified miner's sector. Returns null in case the sector info isn't found
// NOTE: returned info.Expiration may not be accurate in some cases, use StateSectorExpiration to get accurate
// expiration epoch
StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error)

View File

@ -120,16 +120,18 @@ type SectorLog struct {
}
type SectorInfo struct {
SectorID abi.SectorNumber
State SectorState
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
Deals []abi.DealID
Ticket SealTicket
Seed SealSeed
Retries uint64
ToUpgrade bool
SectorID abi.SectorNumber
State SectorState
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
Deals []abi.DealID
Ticket SealTicket
Seed SealSeed
PreCommitMsg *cid.Cid
CommitMsg *cid.Cid
Retries uint64
ToUpgrade bool
LastErr string

View File

@ -37,6 +37,10 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B
return nil
}
if len(h.BeaconEntries) == 0 {
return xerrors.Errorf("expected to have beacon entries in this block, but didn't find any")
}
last := h.BeaconEntries[len(h.BeaconEntries)-1]
if last.Round != maxRound {
return xerrors.Errorf("expected final beacon entry in block to be at round %d, got %d", maxRound, last.Round)

View File

@ -157,7 +157,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres
return nil, err
}
if !ok {
return nil, xerrors.New("sector not found")
return nil, nil
}
return sectorInfo, nil

View File

@ -236,8 +236,10 @@ var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals},
{col: color.FgRed, state: sealing.UndefinedSectorState},
{col: color.FgYellow, state: sealing.Empty},
{col: color.FgYellow, state: sealing.Packing},
{col: color.FgYellow, state: sealing.PreCommit1},
{col: color.FgYellow, state: sealing.PreCommit2},
@ -245,9 +247,13 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.PreCommitWait},
{col: color.FgYellow, state: sealing.WaitSeed},
{col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.SubmitCommit},
{col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.FinalizeSector},
{col: color.FgCyan, state: sealing.Removing},
{col: color.FgCyan, state: sealing.Removed},
{col: color.FgRed, state: sealing.FailedUnrecoverable},
{col: color.FgRed, state: sealing.SealPreCommit1Failed},
{col: color.FgRed, state: sealing.SealPreCommit2Failed},
@ -259,6 +265,9 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.Faulty},
{col: color.FgRed, state: sealing.FaultReported},
{col: color.FgRed, state: sealing.FaultedFinal},
{col: color.FgRed, state: sealing.RemoveFailed},
{col: color.FgRed, state: sealing.DealsExpired},
{col: color.FgRed, state: sealing.RecoverDealIDs},
}
func init() {

View File

@ -155,6 +155,9 @@ var sealingJobsCmd = &cli.Command{
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
return lines[i].Start.Before(lines[j].Start)
})
@ -170,10 +173,14 @@ var sealingJobsCmd = &cli.Command{
}
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tTime\n")
_, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n")
for _, l := range lines {
_, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), time.Now().Sub(l.Start).Truncate(time.Millisecond*100))
state := "running"
if l.RunWait != 0 {
state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
}
_, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100))
}
return tw.Flush()

View File

@ -97,6 +97,8 @@ var sectorsStatusCmd = &cli.Command{
fmt.Printf("TicketH:\t%d\n", status.Ticket.Epoch)
fmt.Printf("Seed:\t\t%x\n", status.Seed.Value)
fmt.Printf("SeedH:\t\t%d\n", status.Seed.Epoch)
fmt.Printf("Precommit:\t%s\n", status.PreCommitMsg)
fmt.Printf("Commit:\t\t%s\n", status.CommitMsg)
fmt.Printf("Proof:\t\t%x\n", status.Proof)
fmt.Printf("Deals:\t\t%v\n", status.Deals)
fmt.Printf("Retries:\t%d\n", status.Retries)

View File

@ -3642,7 +3642,7 @@ Response:
```
### StateSectorGetInfo
StateSectorGetInfo returns the on-chain info for the specified miner's sector
StateSectorGetInfo returns the on-chain info for the specified miner's sector. Returns null in case the sector info isn't found
NOTE: returned info.Expiration may not be accurate in some cases, use StateSectorExpiration to get accurate
expiration epoch

View File

@ -6,8 +6,7 @@ import (
"io"
"net/http"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
@ -17,6 +16,7 @@ import (
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
@ -463,25 +463,19 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
unsealed := stores.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}
var err error
if len(unsealedStores) == 0 { // can be already removed
unsealed = stores.FTNone
}
if rerr := m.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
}
if rerr := m.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
}
if rerr := m.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false)
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove),
func(ctx context.Context, w Worker) error {
return w.Remove(ctx, sector)
})
return err
}
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {

View File

@ -7,6 +7,11 @@ type requestQueue []*workerRequest
func (q requestQueue) Len() int { return len(q) }
func (q requestQueue) Less(i, j int) bool {
oneMuchLess, muchLess := q[i].taskType.MuchLess(q[j].taskType)
if oneMuchLess {
return muchLess
}
if q[i].priority != q[j].priority {
return q[i].priority > q[j].priority
}

View File

@ -22,17 +22,17 @@ func (r Resources) MultiThread() bool {
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
sealtasks.TTAddPiece: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ // This is probably a bit conservative
MaxMemory: 64 << 30,
MinMemory: 64 << 30,
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
MaxMemory: 8 << 30,
MinMemory: 8 << 30,
Threads: 1,
BaseMinMemory: 1 << 30,
},
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ // This is probably a bit conservative
MaxMemory: 32 << 30,
MinMemory: 32 << 30,
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
MaxMemory: 4 << 30,
MinMemory: 4 << 30,
Threads: 1,

View File

@ -21,6 +21,7 @@ type schedPrioCtxKey int
var SchedPriorityKey schedPrioCtxKey
var DefaultSchedPriority = 0
var SelectorTimeout = 5 * time.Second
var InitWait = 3 * time.Second
var (
SchedWindows = 2
@ -85,6 +86,9 @@ type workerHandle struct {
lk sync.Mutex
wndLk sync.Mutex
activeWindows []*schedWindow
// stats / tracking
wt *workTracker
@ -123,6 +127,8 @@ type workerRequest struct {
prepare WorkerAction
work WorkerAction
start time.Time
index int // The index of the item in the heap.
indexHeap int
@ -147,7 +153,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
workerClosing: make(chan WorkerID),
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest),
windowRequests: make(chan *schedWindowRequest, 20),
schedQueue: &requestQueue{},
@ -171,6 +177,8 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType
prepare: prepare,
work: work,
start: time.Now(),
ret: ret,
ctx: ctx,
}:
@ -214,7 +222,12 @@ func (sh *scheduler) runSched() {
go sh.runWorkerWatcher()
iw := time.After(InitWait)
var initialised bool
for {
var doSched bool
select {
case w := <-sh.newWorkers:
sh.newWorker(w)
@ -224,22 +237,47 @@ func (sh *scheduler) runSched() {
case req := <-sh.schedule:
sh.schedQueue.Push(req)
sh.trySched()
doSched = true
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.trySched()
doSched = true
case ireq := <-sh.info:
ireq(sh.diag())
case <-iw:
initialised = true
iw = nil
doSched = true
case <-sh.closing:
sh.schedClose()
return
}
if doSched && initialised {
// First gather any pending tasks, so we go through the scheduling loop
// once for every added task
loop:
for {
select {
case req := <-sh.schedule:
sh.schedQueue.Push(req)
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
default:
break loop
}
}
sh.trySched()
}
}
}
@ -321,7 +359,7 @@ func (sh *scheduler) trySched() {
}
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) {
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
continue
}
@ -392,11 +430,11 @@ func (sh *scheduler) trySched() {
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) {
continue
}
log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd)
windows[wnd].allocated.add(wr, needRes)
@ -475,8 +513,6 @@ func (sh *scheduler) runWorker(wid WorkerID) {
taskDone := make(chan struct{}, 1)
windowsRequested := 0
var activeWindows []*schedWindow
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
@ -510,7 +546,9 @@ func (sh *scheduler) runWorker(wid WorkerID) {
select {
case w := <-scheduledWindows:
activeWindows = append(activeWindows, w)
worker.wndLk.Lock()
worker.activeWindows = append(worker.activeWindows, w)
worker.wndLk.Unlock()
case <-taskDone:
log.Debugw("task done", "workerid", wid)
case <-sh.closing:
@ -521,24 +559,38 @@ func (sh *scheduler) runWorker(wid WorkerID) {
return
}
worker.wndLk.Lock()
windowsRequested -= sh.workerCompactWindows(worker, wid)
assignLoop:
// process windows in order
for len(activeWindows) > 0 {
// process tasks within a window in order
for len(activeWindows[0].todo) > 0 {
todo := activeWindows[0].todo[0]
needRes := ResourceTable[todo.taskType][sh.spt]
for len(worker.activeWindows) > 0 {
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
sh.workersLk.RLock()
tidx := -1
worker.lk.Lock()
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][sh.spt]
if worker.preparing.canHandleRequest(needRes, wid, "startPreparing", worker.info.Resources) {
tidx = t
break
}
}
worker.lk.Unlock()
if !ok {
if tidx == -1 {
sh.workersLk.RUnlock()
break assignLoop
}
todo := firstWindow.todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.RUnlock()
@ -548,19 +600,76 @@ func (sh *scheduler) runWorker(wid WorkerID) {
go todo.respond(xerrors.Errorf("assignWorker error: %w", err))
}
activeWindows[0].todo = activeWindows[0].todo[1:]
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
}
copy(activeWindows, activeWindows[1:])
activeWindows[len(activeWindows)-1] = nil
activeWindows = activeWindows[:len(activeWindows)-1]
copy(worker.activeWindows, worker.activeWindows[1:])
worker.activeWindows[len(worker.activeWindows)-1] = nil
worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1]
windowsRequested--
}
worker.wndLk.Unlock()
}
}()
}
func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int {
// move tasks from older windows to newer windows if older windows
// still can fit them
if len(worker.activeWindows) > 1 {
for wi, window := range worker.activeWindows[1:] {
lower := worker.activeWindows[wi]
var moved []int
for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][sh.spt]
if !lower.allocated.canHandleRequest(needRes, wid, "compactWindows", worker.info.Resources) {
continue
}
moved = append(moved, ti)
lower.todo = append(lower.todo, todo)
lower.allocated.add(worker.info.Resources, needRes)
window.allocated.free(worker.info.Resources, needRes)
}
if len(moved) > 0 {
newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved))
for i, t := range window.todo {
if moved[0] == i {
moved = moved[1:]
continue
}
newTodo = append(newTodo, t)
}
window.todo = newTodo
}
}
}
var compacted int
var newWindows []*schedWindow
for _, window := range worker.activeWindows {
if len(window.todo) == 0 {
compacted++
continue
}
newWindows = append(newWindows, window)
}
worker.activeWindows = newWindows
return compacted
}
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
needRes := ResourceTable[req.taskType][sh.spt]

View File

@ -7,7 +7,7 @@ import (
)
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, wr) {
for !a.canHandleRequest(r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
@ -52,37 +52,37 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
a.memUsedMax -= r.MaxMemory
}
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool {
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
log.Debugf("sched: not scheduling on worker %d for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
log.Debugf("sched: not scheduling on worker %d for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs)
log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs)
log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
log.Debugf("sched: not scheduling on worker %d for %s; GPU in use", wid, caller)
return false
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"runtime"
"sort"
"sync"
"testing"
"time"
@ -22,6 +23,10 @@ import (
"github.com/filecoin-project/specs-storage/storage"
)
func init() {
InitWait = 10 * time.Millisecond
}
func TestWithPriority(t *testing.T) {
ctx := context.Background()
@ -301,7 +306,8 @@ func TestSched(t *testing.T) {
done: map[string]chan struct{}{},
}
for _, task := range tasks {
for i, task := range tasks {
log.Info("TASK", i)
task(t, sched, index, &rm)
}
@ -415,6 +421,45 @@ func TestSched(t *testing.T) {
)
}
diag := func() task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
time.Sleep(20 * time.Millisecond)
for _, request := range s.diag().Requests {
log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)
}
wj := (&Manager{sched: s}).WorkerJobs()
type line struct {
storiface.WorkerJob
wid uint64
}
lines := make([]line, 0)
for wid, jobs := range wj {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: wid,
})
}
}
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
return lines[i].Start.Before(lines[j].Start)
})
for _, l := range lines {
log.Infof("!!! wDIAG: rw(%d) sid(%d) t(%s)", l.RunWait, l.Sector.Number, l.Task)
}
}
}
// run this one a bunch of times, it had a very annoying tendency to fail randomly
for i := 0; i < 40; i++ {
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
@ -423,6 +468,8 @@ func TestSched(t *testing.T) {
// fill queues
twoPC1("w0", 0, taskStarted),
twoPC1("w1", 2, taskNotScheduled),
sched("w2", "fred", 4, sealtasks.TTPreCommit1),
taskNotScheduled("w2"),
// windowed
@ -435,10 +482,18 @@ func TestSched(t *testing.T) {
sched("t3", "fred", 10, sealtasks.TTPreCommit2),
taskNotScheduled("t3"),
diag(),
twoPC1Act("w0", taskDone),
twoPC1Act("w1", taskStarted),
taskNotScheduled("w2"),
twoPC1Act("w1", taskDone),
taskStarted("w2"),
taskDone("w2"),
diag(),
taskStarted("t3"),
taskNotScheduled("t1"),
@ -518,3 +573,68 @@ func BenchmarkTrySched(b *testing.B) {
b.Run("1w-500q", test(1, 500))
b.Run("200w-400q", test(200, 400))
}
func TestWindowCompact(t *testing.T) {
sh := scheduler{
spt: abi.RegisteredSealProof_StackedDrg32GiBV1,
}
test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) {
return func(t *testing.T) {
wh := &workerHandle{
info: storiface.WorkerInfo{
Resources: decentWorkerResources,
},
}
for _, windowTasks := range start {
window := &schedWindow{}
for _, task := range windowTasks {
window.todo = append(window.todo, &workerRequest{taskType: task})
window.allocated.add(wh.info.Resources, ResourceTable[task][sh.spt])
}
wh.activeWindows = append(wh.activeWindows, window)
}
n := sh.workerCompactWindows(wh, 0)
require.Equal(t, len(start)-len(expect), n)
for wi, tasks := range expect {
var expectRes activeResources
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(wh.info.Resources, ResourceTable[task][sh.spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)
require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi)
require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi)
require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi)
}
}
}
t.Run("2-pc1-windows", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}),
)
t.Run("1-window", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}),
)
t.Run("2-pc2-windows", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}),
)
t.Run("2pc1-pc1ap", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1, sealtasks.TTAddPiece}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1, sealtasks.TTAddPiece}, {sealtasks.TTPreCommit1}}),
)
}

View File

@ -17,15 +17,15 @@ const (
)
var order = map[TaskType]int{
TTAddPiece: 7,
TTPreCommit1: 6,
TTPreCommit2: 5,
TTCommit2: 4,
TTCommit1: 3,
TTFetch: 2,
TTFinalize: 1,
TTUnseal: 0,
TTReadUnsealed: 0,
TTAddPiece: 6, // least priority
TTPreCommit1: 5,
TTPreCommit2: 4,
TTCommit2: 3,
TTCommit1: 2,
TTUnseal: 1,
TTFetch: -1,
TTReadUnsealed: -1,
TTFinalize: -2, // most priority
}
var shortNames = map[TaskType]string{
@ -43,6 +43,12 @@ var shortNames = map[TaskType]string{
TTReadUnsealed: "RD ",
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) {
oa, ob := order[a], order[b]
oneNegative := oa^ob < 0
return oneNegative, oa < ob
}
func (a TaskType) Less(b TaskType) bool {
return order[a] < order[b]
}

View File

@ -1,6 +1,8 @@
package sectorstorage
import "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
m.sched.workersLk.RLock()
@ -29,6 +31,20 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
for id, handle := range m.sched.workers {
out[uint64(id)] = handle.wt.Running()
handle.wndLk.Lock()
for wi, window := range handle.activeWindows {
for _, request := range window.todo {
out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{
ID: 0,
Sector: request.sector,
Task: request.taskType,
RunWait: wi + 1,
Start: request.start,
})
}
}
handle.wndLk.Unlock()
}
return out

View File

@ -37,5 +37,6 @@ type WorkerJob struct {
Sector abi.SectorID
Task sealtasks.TaskType
Start time.Time
RunWait int // 0 - running, 1+ - assigned
Start time.Time
}

View File

@ -135,12 +135,34 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{163}); err != nil {
if _, err := w.Write([]byte{164}); err != nil {
return err
}
scratch := make([]byte, 9)
// t.PublishCid (cid.Cid) (struct)
if len("PublishCid") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"PublishCid\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PublishCid"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("PublishCid")); err != nil {
return err
}
if t.PublishCid == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCidBuf(scratch, w, *t.PublishCid); err != nil {
return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err)
}
}
// t.DealID (abi.DealID) (uint64)
if len("DealID") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"DealID\" was too long")
@ -224,7 +246,30 @@ func (t *DealInfo) UnmarshalCBOR(r io.Reader) error {
}
switch name {
// t.DealID (abi.DealID) (uint64)
// t.PublishCid (cid.Cid) (struct)
case "PublishCid":
{
b, err := br.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := br.UnreadByte(); err != nil {
return err
}
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err)
}
t.PublishCid = &c
}
}
// t.DealID (abi.DealID) (uint64)
case "DealID":
{
@ -430,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{182}); err != nil {
if _, err := w.Write([]byte{183}); err != nil {
return err
}
@ -860,6 +905,29 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.Return (sealing.ReturnState) (string)
if len("Return") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Return\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Return"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Return")); err != nil {
return err
}
if len(t.Return) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Return was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Return))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Return)); err != nil {
return err
}
// t.LastErr (string) (string)
if len("LastErr") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"LastErr\" was too long")
@ -1362,6 +1430,17 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
}
}
// t.Return (sealing.ReturnState) (string)
case "Return":
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
t.Return = ReturnState(sval)
}
// t.LastErr (string) (string)
case "LastErr":

View File

@ -33,7 +33,7 @@ type ErrInvalidProof struct{ error }
type ErrNoPrecommit struct{ error }
type ErrCommitWaitFailed struct{ error }
func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) error {
tok, height, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
@ -55,6 +55,10 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)}
}
if proposal.Provider != maddr {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.Provider, maddr)}
}
if proposal.PieceCID != p.Piece.PieceCID {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)}
}
@ -74,6 +78,10 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
// checkPrecommit checks that data commitment generated in the sealing process
// matches pieces, and that the seal ticket isn't expired
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) {
if err := checkPieces(ctx, maddr, si, api); err != nil {
return err
}
commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok)
if err != nil {
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
@ -176,5 +184,9 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")}
}
if err := checkPieces(ctx, m.maddr, si, m.api); err != nil {
return err
}
return nil
}

View File

@ -17,9 +17,9 @@ import (
)
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, err := m.plan(events, user.(*SectorInfo))
next, processed, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
return nil, uint64(len(events)), err
return nil, processed, err
}
return func(ctx statemachine.Context, si SectorInfo) error {
@ -30,10 +30,10 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
}
return nil
}, uint64(len(events)), nil // TODO: This processed event count is not very correct
}, processed, nil // TODO: This processed event count is not very correct
}
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){
// Sealing
UndefinedSectorState: planOne(
@ -49,31 +49,39 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
PreCommit1: planOne(
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPackingFailed{}, PackingFailed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
PreCommit2: planOne(
on(SectorPreCommit2{}, PreCommitting),
on(SectorSealPreCommit2Failed{}, SealPreCommit2Failed),
on(SectorPackingFailed{}, PackingFailed),
),
PreCommitting: planOne(
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitted{}, PreCommitWait),
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
PreCommitWait: planOne(
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorRetryPreCommit{}, PreCommitting),
),
WaitSeed: planOne(
on(SectorSeedReady{}, Committing),
on(SectorChainPreCommitFailed{}, PreCommitFailed),
),
Committing: planCommitting,
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed),
),
CommitWait: planOne(
on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
),
FinalizeSector: planOne(
@ -95,6 +103,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryWaitSeed{}, WaitSeed),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
ComputeProofFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
@ -109,22 +119,33 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
),
PackingFailed: planOne(), // TODO: Deprecated, remove
DealsExpired: planOne(
// SectorRemove (global)
),
RecoverDealIDs: planOne(
onReturning(SectorUpdateDealIDs{}),
),
// Post-seal
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
on(SectorRemove{}, Removing),
),
Removing: planOne(
on(SectorRemoved{}, Removed),
on(SectorRemoveFailed{}, RemoveFailed),
),
RemoveFailed: planOne(
// SectorRemove (global)
),
Faulty: planOne(
on(SectorFaultReported{}, FaultReported),
),
@ -133,7 +154,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Removed: final,
}
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) {
/////
// First process all events
@ -170,11 +191,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
p := fsmPlanners[state.State]
if p == nil {
return nil, xerrors.Errorf("planner for state %s not found", state.State)
return nil, 0, xerrors.Errorf("planner for state %s not found", state.State)
}
if err := p(events, state); err != nil {
return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)
processed, err := p(events, state)
if err != nil {
return nil, 0, xerrors.Errorf("running planner for state %s failed: %w", state.State, err)
}
/////
@ -182,47 +204,50 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
/*
* Empty <- incoming deals
| |
| v
*<- WaitDeals <- incoming deals
| |
| v
*<- Packing <- incoming committed capacity
| |
| v
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
| v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
*<- CommitWait ---/
| |
| v
| FinalizeSector <--> FinalizeFailed
| |
| v
*<- Proving
|
v
FailedUnrecoverable
* Empty <- incoming deals
| |
| v
*<- WaitDeals <- incoming deals
| |
| v
*<- Packing <- incoming committed capacity
| |
| v
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
| v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed
| | ||
| v /-------/|
* PreCommitting <-----+---> PreCommitFailed
| | | ^
| v | |
*<- WaitSeed -----------+-----/
| ||| ^ |
| ||| \--------*-----/
| ||| |
| vvv v----+----> ComputeProofFailed
*<- Committing |
| | ^--> CommitFailed
| v ^
| SubmitCommit |
| | |
| v |
*<- CommitWait ---/
| |
| v
| FinalizeSector <--> FinalizeFailed
| |
| v
*<- Proving
|
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
*/
@ -235,51 +260,63 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case WaitDeals:
log.Infof("Waiting for deals %d", state.SectorNumber)
case Packing:
return m.handlePacking, nil
return m.handlePacking, processed, nil
case PreCommit1:
return m.handlePreCommit1, nil
return m.handlePreCommit1, processed, nil
case PreCommit2:
return m.handlePreCommit2, nil
return m.handlePreCommit2, processed, nil
case PreCommitting:
return m.handlePreCommitting, nil
return m.handlePreCommitting, processed, nil
case PreCommitWait:
return m.handlePreCommitWait, nil
return m.handlePreCommitWait, processed, nil
case WaitSeed:
return m.handleWaitSeed, nil
return m.handleWaitSeed, processed, nil
case Committing:
return m.handleCommitting, nil
return m.handleCommitting, processed, nil
case SubmitCommit:
return m.handleSubmitCommit, processed, nil
case CommitWait:
return m.handleCommitWait, nil
return m.handleCommitWait, processed, nil
case FinalizeSector:
return m.handleFinalizeSector, nil
return m.handleFinalizeSector, processed, nil
// Handled failure modes
case SealPreCommit1Failed:
return m.handleSealPrecommit1Failed, nil
return m.handleSealPrecommit1Failed, processed, nil
case SealPreCommit2Failed:
return m.handleSealPrecommit2Failed, nil
return m.handleSealPrecommit2Failed, processed, nil
case PreCommitFailed:
return m.handlePreCommitFailed, nil
return m.handlePreCommitFailed, processed, nil
case ComputeProofFailed:
return m.handleComputeProofFailed, nil
return m.handleComputeProofFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, nil
return m.handleCommitFailed, processed, nil
case FinalizeFailed:
return m.handleFinalizeFailed, nil
return m.handleFinalizeFailed, processed, nil
case PackingFailed: // DEPRECATED: remove this for the next reset
state.State = DealsExpired
fallthrough
case DealsExpired:
return m.handleDealsExpired, processed, nil
case RecoverDealIDs:
return m.handleRecoverDealIDs, processed, nil
// Post-seal
case Proving:
return m.handleProvingSector, nil
return m.handleProvingSector, processed, nil
case Removing:
return m.handleRemoving, nil
return m.handleRemoving, processed, nil
case Removed:
return nil, nil
return nil, processed, nil
case RemoveFailed:
return m.handleRemoveFailed, processed, nil
// Faults
case Faulty:
return m.handleFaulty, nil
return m.handleFaulty, processed, nil
case FaultReported:
return m.handleFaultReported, nil
return m.handleFaultReported, processed, nil
// Fatal errors
case UndefinedSectorState:
@ -290,28 +327,29 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
log.Errorf("unexpected sector update state: %s", state.State)
}
return nil, nil
return nil, processed, nil
}
func planCommitting(events []statemachine.Event, state *SectorInfo) error {
for _, event := range events {
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
for i, event := range events {
switch e := event.User.(type) {
case globalMutator:
if e.applyGlobal(state) {
return nil
return uint64(i + 1), nil
}
case SectorCommitted: // the normal case
e.apply(state)
state.State = CommitWait
state.State = SubmitCommit
case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
continue // or it didn't!
}
log.Warnf("planCommitting: commit Seed changed")
e.apply(state)
state.State = Committing
return nil
return uint64(i + 1), nil
case SectorComputeProofFailed:
state.State = ComputeProofFailed
case SectorSealPreCommit1Failed:
@ -321,10 +359,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
case SectorRetryCommitWait:
state.State = CommitWait
default:
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
return uint64(i), xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
}
}
return nil
return uint64(len(events)), nil
}
func (m *Sealing) restartSectors(ctx context.Context) error {
@ -365,31 +403,38 @@ func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, sta
return m.sectors.Send(id, SectorForceState{state})
}
func final(events []statemachine.Event, state *SectorInfo) error {
return xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}
func on(mut mutator, next SectorState) func() (mutator, SectorState) {
return func() (mutator, SectorState) {
return mut, next
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
state.State = next
return nil
}
}
}
func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
return func(events []statemachine.Event, state *SectorInfo) error {
if len(events) != 1 {
for _, event := range events {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
return nil
}
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
if state.Return == "" {
return xerrors.Errorf("return state not set")
}
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", state.State, events)
}
state.State = SectorState(state.Return)
state.Return = ""
return nil
}
}
}
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
return nil
return 1, nil
}
for _, t := range ts {
@ -404,15 +449,14 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema
}
events[0].User.(mutator).apply(state)
state.State = next
return nil
return 1, next(state)
}
_, ok := events[0].User.(Ignorable)
if ok {
return nil
return 1, nil
}
return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
}
}

View File

@ -101,10 +101,6 @@ func (evt SectorPacked) apply(state *SectorInfo) {
}
}
type SectorPackingFailed struct{ error }
func (evt SectorPackingFailed) apply(*SectorInfo) {}
type SectorPreCommit1 struct {
PreCommit1Out storage.PreCommit1Out
TicketValue abi.SealRandomness
@ -191,13 +187,28 @@ type SectorCommitFailed struct{ error }
func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorCommitFailed) apply(*SectorInfo) {}
type SectorRetrySubmitCommit struct{}
func (evt SectorRetrySubmitCommit) apply(*SectorInfo) {}
type SectorDealsExpired struct{ error }
func (evt SectorDealsExpired) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorDealsExpired) apply(*SectorInfo) {}
type SectorCommitted struct {
Message cid.Cid
Proof []byte
Proof []byte
}
func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.Proof
}
type SectorCommitSubmitted struct {
Message cid.Cid
}
func (evt SectorCommitSubmitted) apply(state *SectorInfo) {
state.CommitMessage = &evt.Message
}
@ -256,6 +267,24 @@ type SectorRetryCommitWait struct{}
func (evt SectorRetryCommitWait) apply(state *SectorInfo) {}
type SectorInvalidDealIDs struct {
Return ReturnState
}
func (evt SectorInvalidDealIDs) apply(state *SectorInfo) {
state.Return = evt.Return
}
type SectorUpdateDealIDs struct {
Updates map[int]abi.DealID
}
func (evt SectorUpdateDealIDs) apply(state *SectorInfo) {
for i, id := range evt.Updates {
state.Pieces[i].DealInfo.DealID = id
}
}
// Faults
type SectorFaulty struct{}
@ -274,7 +303,10 @@ type SectorFaultedFinal struct{}
type SectorRemove struct{}
func (evt SectorRemove) apply(state *SectorInfo) {}
func (evt SectorRemove) applyGlobal(state *SectorInfo) bool {
state.State = Removing
return true
}
type SectorRemoved struct{}

View File

@ -16,7 +16,7 @@ func init() {
}
func (t *test) planSingle(evt interface{}) {
_, err := t.s.plan([]statemachine.Event{{User: evt}}, t.state)
_, _, err := t.s.plan([]statemachine.Event{{User: evt}}, t.state)
require.NoError(t.t, err)
}
@ -58,6 +58,9 @@ func TestHappyPath(t *testing.T) {
require.Equal(m.t, m.state.State, Committing)
m.planSingle(SectorCommitted{})
require.Equal(m.t, m.state.State, SubmitCommit)
m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorProving{})
@ -98,13 +101,16 @@ func TestSeedRevert(t *testing.T) {
m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, Committing)
_, err := m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
_, _, err := m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
require.NoError(t, err)
require.Equal(m.t, m.state.State, Committing)
// not changing the seed this time
_, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
_, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state)
require.NoError(t, err)
require.Equal(m.t, m.state.State, SubmitCommit)
m.planSingle(SectorCommitSubmitted{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorProving{})
@ -129,7 +135,8 @@ func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) {
events := []statemachine.Event{{User: SectorCommitFailed{}}}
require.NoError(t, planCommitting(events, m.state))
_, err := planCommitting(events, m.state)
require.NoError(t, err)
require.Equal(t, CommitFailed, m.state.State)
}

View File

@ -141,7 +141,7 @@ func (m *Sealing) Stop(ctx context.Context) error {
return m.sectors.Stop(ctx)
}
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
log.Infof("Adding piece for deal %d", d.DealID)
log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid)
if (padreader.PaddedSize(uint64(size))) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}

View File

@ -10,12 +10,13 @@ const (
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit2
PreCommitting SectorState = "PreCommitting" // on chain pre-commit
PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing"
CommitWait SectorState = "CommitWait" // waiting for message to land on chain
Committing SectorState = "Committing" // compute PoRep
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain
FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving"
// error modes
@ -25,8 +26,10 @@ const (
PreCommitFailed SectorState = "PreCommitFailed"
ComputeProofFailed SectorState = "ComputeProofFailed"
CommitFailed SectorState = "CommitFailed"
PackingFailed SectorState = "PackingFailed"
PackingFailed SectorState = "PackingFailed" // TODO: deprecated, remove
FinalizeFailed SectorState = "FinalizeFailed"
DealsExpired SectorState = "DealsExpired"
RecoverDealIDs SectorState = "RecoverDealIDs"
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain

View File

@ -1,12 +1,18 @@
package sealing
import (
"bytes"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
)
const minRetryTime = 1 * time.Minute
@ -81,6 +87,11 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitFailed})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{})
case *ErrPrecommitOnChain:
@ -88,6 +99,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
case *ErrSectorNumberAllocated:
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
// TODO: check if the sector is committed (not sure how we'd end up here)
// TODO: check on-chain state, adjust local sector number counter to not give out allocated numbers
return nil
default:
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
@ -157,7 +169,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
case *ErrExpiredTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)})
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case nil:
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
case *ErrPrecommitOnChain:
@ -192,6 +209,11 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorRetryPreCommitWait{})
case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrCommitWaitFailed:
if err := failedCooldown(ctx, sector); err != nil {
return err
@ -221,3 +243,120 @@ func (m *Sealing) handleFinalizeFailed(ctx statemachine.Context, sector SectorIn
return ctx.Send(SectorRetryFinalize{})
}
func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRemove{})
}
func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error {
// First make vary sure the sector isn't committed
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if err != nil {
return xerrors.Errorf("getting sector info: %w", err)
}
if si != nil {
// TODO: this should never happen, but in case it does, try to go back to
// the proving state after running some checks
return xerrors.Errorf("sector is committed on-chain, but we're in DealsExpired")
}
if sector.PreCommitInfo == nil {
// TODO: Create a separate state which will remove those pieces, and go back to PC1
log.Errorf("non-precommitted sector with expired deals, can't recover from this yet")
}
// Not much to do here, we can't go back in time to commit this sector
return ctx.Send(SectorRemove{})
}
func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
var toFix []int
for i, p := range sector.Pieces {
// if no deal is associated with the piece, ensure that we added it as
// filler (i.e. ensure that it has a zero PieceCID)
if p.DealInfo == nil {
exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded())
if !p.Piece.PieceCID.Equals(exp) {
return xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sector.SectorNumber, i, p.Piece.PieceCID)
}
continue
}
proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok)
if err != nil {
log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err)
toFix = append(toFix, i)
continue
}
if proposal.Provider != m.maddr {
log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr)
toFix = append(toFix, i)
continue
}
if proposal.PieceCID != p.Piece.PieceCID {
log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)
toFix = append(toFix, i)
continue
}
if p.Piece.Size != proposal.PieceSize {
log.Warnf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize)
toFix = append(toFix, i)
continue
}
if height >= proposal.StartEpoch {
// TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces
// (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval)
return xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height)
}
}
updates := map[int]abi.DealID{}
for _, i := range toFix {
p := sector.Pieces[i]
if p.DealInfo.PublishCid == nil {
// TODO: check if we are in an early enough state try to remove this piece
log.Error("can't fix sector deals: piece %d (of %d) of sector %d has nil DealInfo.PublishCid (refers to deal %d)", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID)
// Not much to do here (and this can only happen for old spacerace sectors)
return ctx.Send(SectorRemove{})
}
ml, err := m.api.StateSearchMsg(ctx.Context(), *p.DealInfo.PublishCid)
if err != nil {
return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err)
}
if ml.Receipt.ExitCode != exitcode.Ok {
return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): non-ok exit code: %s", *p.DealInfo.PublishCid, sector.SectorNumber, i, ml.Receipt.ExitCode)
}
var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return)); err != nil {
return xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}
if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}
updates[i] = retval.IDs[0]
}
// Not much to do here, we can't go back in time to commit this sector
return ctx.Send(SectorUpdateDealIDs{Updates: updates})
}

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/specs-storage/storage"
)
@ -79,15 +80,16 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
}
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector, m.api); err != nil { // Sanity check state
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)})
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommit1})
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)})
return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)})
default:
return xerrors.Errorf("checkPieces sanity check error: %w", err)
}
@ -155,6 +157,11 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitting})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrPrecommitOnChain:
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
case *ErrSectorNumberAllocated:
@ -226,11 +233,18 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorChainPreCommitFailed{err})
}
if mw.Receipt.ExitCode != 0 {
switch mw.Receipt.ExitCode {
case exitcode.Ok:
// this is what we expect
case exitcode.SysErrOutOfGas:
// gas estimator guessed a wrong number
return ctx.Send(SectorRetryPreCommit{})
default:
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
return ctx.Send(SectorChainPreCommitFailed{err})
}
log.Info("precommit message landed on chain: ", sector.SectorNumber)
return ctx.Send(SectorPreCommitLanded{TipSet: mw.TipSetTok})
@ -326,21 +340,25 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}
return ctx.Send(SectorCommitted{
Proof: proof,
})
}
func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
// TODO: Consider splitting states and persist proof for faster recovery
params := &miner.ProveCommitSectorParams{
SectorNumber: sector.SectorNumber,
Proof: proof,
Proof: sector.Proof,
}
enc := new(bytes.Buffer)
@ -372,14 +390,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
collateral = big.Zero()
}
// TODO: check seed / ticket are up to date
// TODO: check seed / ticket / deals are up to date
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
return ctx.Send(SectorCommitted{
Proof: proof,
return ctx.Send(SectorCommitSubmitted{
Message: mcid,
})
}
@ -395,13 +412,22 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
}
if mw.Receipt.ExitCode != 0 {
switch mw.Receipt.ExitCode {
case exitcode.Ok:
// this is what we expect
case exitcode.SysErrOutOfGas:
// gas estimator guessed a wrong number
return ctx.Send(SectorRetrySubmitCommit{})
default:
return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)})
}
_, err = m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok)
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok)
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron: %w", err)})
return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, calling StateSectorGetInfo: %w", err)})
}
if si == nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron")})
}
return ctx.Send(SectorProving{})

View File

@ -30,6 +30,7 @@ type Piece struct {
// DealInfo is a tuple of deal identity and its schedule
type DealInfo struct {
PublishCid *cid.Cid
DealID abi.DealID
DealSchedule DealSchedule
KeepUnsealed bool
@ -53,6 +54,15 @@ type Log struct {
Kind string
}
type ReturnState string
const (
RetPreCommit1 = ReturnState(PreCommit1)
RetPreCommitting = ReturnState(PreCommitting)
RetPreCommitFailed = ReturnState(PreCommitFailed)
RetCommitFailed = ReturnState(CommitFailed)
)
type SectorInfo struct {
State SectorState
SectorNumber abi.SectorNumber
@ -90,6 +100,9 @@ type SectorInfo struct {
// Faults
FaultReportMsg *cid.Cid
// Recovery
Return ReturnState
// Debug
LastErr string

View File

@ -72,6 +72,10 @@ func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreC
log.Errorf("error calling StateSectorGetInfo for replaced sector: %+v", err)
return big.Zero()
}
if ri == nil {
log.Errorf("couldn't find sector info for sector to replace: %+v", replace)
return big.Zero()
}
if params.Expiration < ri.Expiration {
// TODO: Some limit on this

2
go.mod
View File

@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.6.2
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v0.5.7
github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6

4
go.sum
View File

@ -247,8 +247,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814234959-80b1788108ac/go.mod h1:umicPCaN99ysHTiYOmwhuLxTFbOwcsI+mdw/t96vvM4=
github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E=
github.com/filecoin-project/go-fil-markets v0.5.7 h1:kzyMHqez8ssxchj5s9M1hkC3CTwRGh2MeglJGfUksQU=
github.com/filecoin-project/go-fil-markets v0.5.7/go.mod h1:KnvFG3kSQ77vKYSY/QdrXET81wVCBByHXjG7AyxnbUw=
github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465 h1:74yonPhkVakfqUHcgfJ+vQOfCJQNiUBKn8XN9Z6F0S0=
github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465/go.mod h1:KnvFG3kSQ77vKYSY/QdrXET81wVCBByHXjG7AyxnbUw=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200817153016-2ea5cbaf5ec0/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 h1:FXtCp0ybqdQL9knb3OGDpkNTaBbPxgkqPeWKotUwkH0=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=

View File

@ -87,8 +87,13 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) {
if deal.PublishCid == nil {
return nil, xerrors.Errorf("deal.PublishCid can't be nil")
}
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sealing.DealInfo{
DealID: deal.DealID,
DealID: deal.DealID,
PublishCid: deal.PublishCid,
DealSchedule: sealing.DealSchedule{
StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch,
EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch,
@ -351,7 +356,7 @@ func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetTo
}
func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error {
receipt, err := n.StateWaitMsg(ctx, mcid, build.MessageConfidence)
receipt, err := n.StateWaitMsg(ctx, mcid, 2*build.MessageConfidence)
if err != nil {
return cb(0, nil, err)
}

View File

@ -153,8 +153,10 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb
Value: info.SeedValue,
Epoch: info.SeedEpoch,
},
Retries: info.InvalidProofs,
ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid),
PreCommitMsg: info.PreCommitMessage,
CommitMsg: info.CommitMessage,
Retries: info.InvalidProofs,
ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid),
LastErr: info.LastErr,
Log: log,
@ -175,6 +177,9 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb
onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
if err != nil {
return sInfo, err
}
if onChainInfo == nil {
return sInfo, nil
}
sInfo.SealProof = onChainInfo.SealProof

View File

@ -102,6 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize
return 0, 0, err
}
// TODO: DealID has very low finality here
err = st.writeRef(d.DealID, sn, offset, size)
if err != nil {
return 0, 0, xerrors.Errorf("writeRef: %w", err)