sectorstorage: handle restarting manager, test that

This commit is contained in:
Łukasz Magiera 2020-09-17 00:35:09 +02:00
parent 5e09581256
commit d9d644b27f
9 changed files with 374 additions and 42 deletions

View File

@ -148,12 +148,28 @@ func (t *WorkState) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{163}); err != nil {
if _, err := w.Write([]byte{164}); err != nil {
return err
}
scratch := make([]byte, 9)
// t.ID (sectorstorage.WorkID) (struct)
if len("ID") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"ID\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("ID"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("ID")); err != nil {
return err
}
if err := t.ID.MarshalCBOR(w); err != nil {
return err
}
// t.Status (sectorstorage.WorkStatus) (string)
if len("Status") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Status\" was too long")
@ -251,7 +267,17 @@ func (t *WorkState) UnmarshalCBOR(r io.Reader) error {
}
switch name {
// t.Status (sectorstorage.WorkStatus) (string)
// t.ID (sectorstorage.WorkID) (struct)
case "ID":
{
if err := t.ID.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.ID: %w", err)
}
}
// t.Status (sectorstorage.WorkStatus) (string)
case "Status":
{
@ -291,3 +317,125 @@ func (t *WorkState) UnmarshalCBOR(r io.Reader) error {
return nil
}
func (t *WorkID) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{162}); err != nil {
return err
}
scratch := make([]byte, 9)
// t.Method (string) (string)
if len("Method") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Method\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Method"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Method")); err != nil {
return err
}
if len(t.Method) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Method was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Method))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Method)); err != nil {
return err
}
// t.Params (string) (string)
if len("Params") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Params\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Params"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Params")); err != nil {
return err
}
if len(t.Params) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Params was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Params))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Params)); err != nil {
return err
}
return nil
}
func (t *WorkID) UnmarshalCBOR(r io.Reader) error {
*t = WorkID{}
br := cbg.GetPeeker(r)
scratch := make([]byte, 8)
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("WorkID: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Method (string) (string)
case "Method":
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
t.Method = string(sval)
}
// t.Params (string) (string)
case "Params":
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
t.Params = string(sval)
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
return nil
}

View File

@ -43,7 +43,7 @@ type Worker interface {
// returns channel signalling worker shutdown
Closing(context.Context) (<-chan struct{}, error)
Close() error
Close() error // TODO: do we need this?
}
type SectorManager interface {
@ -75,12 +75,12 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore
callToWork map[storiface.CallID]workID
callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
callRes map[storiface.CallID]chan result
results map[workID]result
waitRes map[workID]chan struct{}
results map[WorkID]result
waitRes map[WorkID]chan struct{}
}
type result struct {
@ -131,13 +131,13 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
Prover: prover,
work: mss,
callToWork: map[storiface.CallID]workID{},
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},
results: map[workID]result{},
waitRes: map[workID]chan struct{}{},
results: map[WorkID]result{},
waitRes: map[WorkID]chan struct{}{},
}
// TODO: remove all non-running work from the work tracker
m.setupWorkTracker()
go m.sched.runSched()

View File

@ -11,16 +11,16 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type workID struct {
type WorkID struct {
Method string
Params string // json [...params]
}
func (w workID) String() string {
func (w WorkID) String() string {
return fmt.Sprintf("%s(%s)", w.Method, w.Params)
}
var _ fmt.Stringer = &workID{}
var _ fmt.Stringer = &WorkID{}
type WorkStatus string
const (
@ -30,16 +30,18 @@ const (
)
type WorkState struct {
ID WorkID
Status WorkStatus
WorkerCall storiface.CallID // Set when entering wsRunning
WorkError string // Status = wsDone, set when failed to start work
}
func newWorkID(method string, params ...interface{}) (workID, error) {
func newWorkID(method string, params ...interface{}) (WorkID, error) {
pb, err := json.Marshal(params)
if err != nil {
return workID{}, xerrors.Errorf("marshaling work params: %w", err)
return WorkID{}, xerrors.Errorf("marshaling work params: %w", err)
}
if len(pb) > 256 {
@ -47,17 +49,55 @@ func newWorkID(method string, params ...interface{}) (workID, error) {
pb = s[:]
}
return workID{
return WorkID{
Method: method,
Params: string(pb),
}, nil
}
func (m *Manager) setupWorkTracker() {
m.workLk.Lock()
defer m.workLk.Unlock()
var ids []WorkState
if err := m.work.List(&ids); err != nil {
log.Error("getting work IDs") // quite bad
return
}
for _, st := range ids {
wid := st.ID
if err := m.work.Get(wid).Get(&st); err != nil {
log.Errorf("getting work state for %s", wid)
continue
}
switch st.Status {
case wsStarted:
log.Warnf("dropping non-running work %s", wid)
if err := m.work.Get(wid).End(); err != nil {
log.Errorf("cleannig up work state for %s", wid)
}
case wsDone:
// realistically this shouldn't ever happen as we return results
// immediately after getting them
log.Warnf("dropping done work, no result, wid %s", wid)
if err := m.work.Get(wid).End(); err != nil {
log.Errorf("cleannig up work state for %s", wid)
}
case wsRunning:
m.callToWork[st.WorkerCall] = wid
}
}
}
// returns wait=true when the task is already tracked/running
func (m *Manager) getWork(ctx context.Context, method string, params ...interface{}) (wid workID, wait bool, err error) {
func (m *Manager) getWork(ctx context.Context, method string, params ...interface{}) (wid WorkID, wait bool, err error) {
wid, err = newWorkID(method, params)
if err != nil {
return workID{}, false, xerrors.Errorf("creating workID: %w", err)
return WorkID{}, false, xerrors.Errorf("creating WorkID: %w", err)
}
m.workLk.Lock()
@ -65,15 +105,16 @@ func (m *Manager) getWork(ctx context.Context, method string, params ...interfac
have, err := m.work.Has(wid)
if err != nil {
return workID{}, false, xerrors.Errorf("failed to check if the task is already tracked: %w", err)
return WorkID{}, false, xerrors.Errorf("failed to check if the task is already tracked: %w", err)
}
if !have {
err := m.work.Begin(wid, &WorkState{
ID: wid,
Status: wsStarted,
})
if err != nil {
return workID{}, false, xerrors.Errorf("failed to track task start: %w", err)
return WorkID{}, false, xerrors.Errorf("failed to track task start: %w", err)
}
return wid, false, nil
@ -84,7 +125,7 @@ func (m *Manager) getWork(ctx context.Context, method string, params ...interfac
return wid, true, nil
}
func (m *Manager) startWork(ctx context.Context, wk workID) func(callID storiface.CallID, err error) error {
func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storiface.CallID, err error) error {
return func(callID storiface.CallID, err error) error {
m.workLk.Lock()
defer m.workLk.Unlock()
@ -123,7 +164,7 @@ func (m *Manager) startWork(ctx context.Context, wk workID) func(callID storifac
}
}
func (m *Manager) waitWork(ctx context.Context, wid workID) (interface{}, error) {
func (m *Manager) waitWork(ctx context.Context, wid WorkID) (interface{}, error) {
m.workLk.Lock()
var ws WorkState

View File

@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"github.com/google/uuid"
@ -83,7 +84,7 @@ func (t *testStorage) Stat(path string) (fsutil.FsStat, error) {
var _ stores.LocalStorage = &testStorage{}
func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *stores.Remote, *stores.Index) {
func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *stores.Local, *stores.Remote, *stores.Index) {
st := newTestStorage(t)
defer st.cleanup()
@ -113,13 +114,15 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st
Prover: prover,
work: statestore.New(datastore.NewMapDatastore()),
callToWork: map[storiface.CallID]workID{},
work: statestore.New(ds),
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},
results: map[workID]result{},
waitRes: map[workID]chan struct{}{},
results: map[WorkID]result{},
waitRes: map[WorkID]chan struct{}{},
}
m.setupWorkTracker()
go m.sched.runSched()
return m, lstor, stor, si
@ -129,7 +132,7 @@ func TestSimple(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx := context.Background()
m, lstor, _, _ := newTestMgr(ctx, t)
m, lstor, _, _ := newTestMgr(ctx, t, datastore.NewMapDatastore())
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
@ -157,5 +160,113 @@ func TestSimple(t *testing.T) {
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
require.NoError(t, err)
}
func TestRedoPC1(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx := context.Background()
m, lstor, _, _ := newTestMgr(ctx, t, datastore.NewMapDatastore())
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)
pieces := []abi.PieceInfo{pi, piz}
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
require.NoError(t, err)
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
require.NoError(t, err)
require.Equal(t, 2, tw.pc1s)
}
func TestRestartManager(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, _, _ := newTestMgr(ctx, t, ds)
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)
pieces := []abi.PieceInfo{pi, piz}
ticket := abi.SealRandomness{0, 9, 9, 9, 9, 9, 9, 9}
tw.pc1lk.Lock()
tw.pc1wait = &sync.WaitGroup{}
tw.pc1wait.Add(1)
var cwg sync.WaitGroup
cwg.Add(1)
var perr error
go func() {
defer cwg.Done()
_, perr = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()
tw.pc1wait.Wait()
require.NoError(t, m.Close(ctx))
tw.ret = nil
cwg.Wait()
require.Error(t, perr)
m, lstor, _, _ = newTestMgr(ctx, t, ds)
tw.ret = m // simulate jsonrpc auto-reconnect
tw.pc1lk.Unlock()
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
require.NoError(t, err)
require.Equal(t, 1, tw.pc1s)
}

View File

@ -801,11 +801,11 @@ func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
log.Debugf("dropWorker %d", wid)
go func() {
/*go func() { // TODO: just remove?
if err := w.w.Close(); err != nil {
log.Warnf("closing worker %d: %+v", err)
}
}()
}()*/
}
}

View File

@ -3,6 +3,7 @@ package sectorstorage
import (
"context"
"io"
"sync"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
@ -22,6 +23,10 @@ type testWorker struct {
ret storiface.WorkerReturn
mockSeal *mock.SectorMgr
pc1s int
pc1lk sync.Mutex
pc1wait *sync.WaitGroup
}
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
@ -55,15 +60,6 @@ func (t *testWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallI
return ci, nil
}
func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
@ -85,6 +81,24 @@ func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSiz
})
}
func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
t.pc1s++
if t.pc1wait != nil {
t.pc1wait.Done()
}
t.pc1lk.Lock()
defer t.pc1lk.Unlock()
p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -21,8 +21,7 @@ const (
type Call struct {
State CallState
// Params cbg.Deferred // TODO: support once useful
Result []byte
Result []byte // json bytes
}
func (wt *workerCallTracker) onStart(ci storiface.CallID) error {

View File

@ -2,6 +2,7 @@ package sectorstorage
import (
"context"
"encoding/json"
"io"
"os"
"reflect"
@ -161,9 +162,26 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ret
go func() {
res, err := work(ci)
{
rb, err := json.Marshal(res)
if err != nil {
log.Errorf("tracking call (marshaling results): %+v", err)
} else {
if err := l.ct.onDone(ci, rb); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
}
}
if err := returnFunc[rt](ctx, ci, l.ret, res, err); err != nil {
log.Errorf("return error: %s: %+v", rt, err)
}
if err := l.ct.onReturned(ci); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
}()
return ci, nil

View File

@ -87,6 +87,7 @@ func main() {
err = gen.WriteMapEncodersToFile("./extern/sector-storage/cbor_gen.go", "sectorstorage",
sectorstorage.Call{},
sectorstorage.WorkState{},
sectorstorage.WorkID{},
)
if err != nil {
fmt.Println(err)