workers: Return unfinished tasks on restart

This commit is contained in:
Łukasz Magiera 2020-09-22 00:52:33 +02:00
parent aa5bd7bc17
commit 03c3d8bdb3
6 changed files with 283 additions and 24 deletions

View File

@ -17,12 +17,51 @@ func (t *Call) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{162}); err != nil {
if _, err := w.Write([]byte{164}); err != nil {
return err
}
scratch := make([]byte, 9)
// t.ID (storiface.CallID) (struct)
if len("ID") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"ID\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("ID"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("ID")); err != nil {
return err
}
if err := t.ID.MarshalCBOR(w); err != nil {
return err
}
// t.RetType (sectorstorage.ReturnType) (string)
if len("RetType") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RetType\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("RetType"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RetType")); err != nil {
return err
}
if len(t.RetType) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.RetType was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.RetType))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.RetType)); err != nil {
return err
}
// t.State (sectorstorage.CallState) (uint64)
if len("State") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"State\" was too long")
@ -98,7 +137,28 @@ func (t *Call) UnmarshalCBOR(r io.Reader) error {
}
switch name {
// t.State (sectorstorage.CallState) (uint64)
// t.ID (storiface.CallID) (struct)
case "ID":
{
if err := t.ID.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.ID: %w", err)
}
}
// t.RetType (sectorstorage.ReturnType) (string)
case "RetType":
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
t.RetType = ReturnType(sval)
}
// t.State (sectorstorage.CallState) (uint64)
case "State":
{

View File

@ -11,6 +11,7 @@ import (
"strings"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
@ -203,6 +204,7 @@ func TestRedoPC1(t *testing.T) {
require.Equal(t, 2, tw.pc1s)
}
// Manager restarts in the middle of a task, restarts it, it completes
func TestRestartManager(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
@ -262,6 +264,8 @@ func TestRestartManager(t *testing.T) {
m, lstor, _, _ = newTestMgr(ctx, t, ds)
tw.ret = m // simulate jsonrpc auto-reconnect
err = m.AddWorker(ctx, tw)
require.NoError(t, err)
tw.pc1lk.Unlock()
@ -270,3 +274,68 @@ func TestRestartManager(t *testing.T) {
require.Equal(t, 1, tw.pc1s)
}
// Worker restarts in the middle of a task, task fails after restart
func TestRestartWorker(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, stor, idx := newTestMgr(ctx, t, ds)
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
SealProof: 0,
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
apDone := make(chan struct{})
go func() {
defer close(apDone)
_, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.Error(t, err)
}()
// kill the worker
<-arch
require.NoError(t, w.Close())
for {
if len(m.WorkerStats()) == 0 {
break
}
time.Sleep(time.Millisecond * 3)
}
// restart the worker
w = newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
SealProof: 0,
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
err = m.AddWorker(ctx, w)
require.NoError(t, err)
<-apDone
}

View File

@ -0,0 +1,81 @@
package sectorstorage
import (
"context"
"io"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type apres struct {
pi abi.PieceInfo
err error
}
type testExec struct {
apch chan chan apres
}
func (t *testExec) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
panic("implement me")
}
func (t *testExec) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error) {
panic("implement me")
}
func (t *testExec) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
panic("implement me")
}
func (t *testExec) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
panic("implement me")
}
func (t *testExec) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
panic("implement me")
}
func (t *testExec) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
panic("implement me")
}
func (t *testExec) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
panic("implement me")
}
func (t *testExec) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
panic("implement me")
}
func (t *testExec) Remove(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (t *testExec) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (t *testExec) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
resp := make(chan apres)
t.apch <- resp
ar := <-resp
return ar.pi, ar.err
}
func (t *testExec) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
panic("implement me")
}
func (t *testExec) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
panic("implement me")
}
var _ ffiwrapper.Storage = &testExec{}

View File

@ -19,13 +19,18 @@ const (
)
type Call struct {
ID storiface.CallID
RetType ReturnType
State CallState
Result []byte // json bytes
}
func (wt *workerCallTracker) onStart(ci storiface.CallID) error {
func (wt *workerCallTracker) onStart(ci storiface.CallID, rt ReturnType) error {
return wt.st.Begin(ci, &Call{
ID: ci,
RetType:rt,
State: CallStarted,
})
}
@ -43,3 +48,8 @@ func (wt *workerCallTracker) onReturned(ci storiface.CallID) error {
st := wt.st.Get(ci)
return st.End()
}
func (wt *workerCallTracker) unfinished() ([]Call, error) {
var out []Call
return out, wt.st.List(&out)
}

View File

@ -38,18 +38,21 @@ type LocalWorker struct {
localStore *stores.Local
sindex stores.SectorIndex
ret storiface.WorkerReturn
executor func() (ffiwrapper.Storage, error)
ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
closing chan struct{}
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
}
return &LocalWorker{
w := &LocalWorker{
scfg: &ffiwrapper.Config{
SealProofType: wcfg.SealProof,
},
@ -62,7 +65,37 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
st: cst,
},
acceptTasks: acceptTasks,
executor: executor,
closing: make(chan struct{}),
}
if w.executor == nil {
w.executor = w.ffiExec
}
unfinished, err := w.ct.unfinished()
if err != nil {
log.Errorf("reading unfinished tasks: %+v", err)
return w
}
go func() {
for _, call := range unfinished {
err := xerrors.Errorf("worker restarted")
if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil {
log.Errorf("return error: %s: %+v", call.RetType, err)
}
}
}()
return w
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst)
}
type localWorkerPathProvider struct {
@ -101,11 +134,11 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
}, nil
}
func (l *LocalWorker) sb() (ffiwrapper.Storage, error) {
func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg)
}
type returnType string
type ReturnType string
// in: func(WorkerReturn, context.Context, CallID, err string)
// in: func(WorkerReturn, context.Context, CallID, ret T, err string)
@ -123,7 +156,12 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
var ro []reflect.Value
if withRet {
ro = rf.Call([]reflect.Value{rwr, rctx, rci, reflect.ValueOf(i), rerr})
ret := reflect.ValueOf(i)
if i == nil {
ret = reflect.Zero(rf.Type().In(3))
}
ro = rf.Call([]reflect.Value{rwr, rctx, rci, ret, rerr})
} else {
ro = rf.Call([]reflect.Value{rwr, rctx, rci, rerr})
}
@ -136,7 +174,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
}
}
var returnFunc = map[returnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{
"AddPiece": rfunc(storiface.WorkerReturn.ReturnAddPiece),
"SealPreCommit1": rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
"SealPreCommit2": rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
@ -150,13 +188,13 @@ var returnFunc = map[returnType]func(context.Context, storiface.CallID, storifac
"Fetch": rfunc(storiface.WorkerReturn.ReturnFetch),
}
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt returnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
ci := storiface.CallID{
Sector: sector,
ID: uuid.New(),
}
if err := l.ct.onStart(ci); err != nil {
if err := l.ct.onStart(ci, rt); err != nil {
log.Errorf("tracking call (start): %+v", err)
}
@ -196,7 +234,7 @@ func errstr(err error) string {
}
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return err
}
@ -205,7 +243,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -240,7 +278,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t
}
}
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return nil, err
}
@ -250,7 +288,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t
}
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -261,7 +299,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p
}
func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -272,7 +310,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick
}
func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -283,7 +321,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas
}
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -330,7 +368,7 @@ func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, type
}
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -353,7 +391,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
sb, err := l.sb()
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
@ -405,10 +443,11 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
}
func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return make(chan struct{}), nil
return l.closing, nil
}
func (l *LocalWorker) Close() error {
close(l.closing)
return nil
}

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/go-statestore"
"net/http"
"time"
@ -43,6 +42,7 @@ import (
"github.com/filecoin-project/go-multistore"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
@ -50,15 +50,15 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/markets"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/miner"