worker: Redeclare storage on reconnect

This commit is contained in:
Łukasz Magiera 2020-09-28 21:06:49 +02:00
parent 4ba7af6061
commit 810c767200
3 changed files with 86 additions and 23 deletions

View File

@ -300,7 +300,7 @@ type StorageMinerStruct struct {
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin" retry:"true"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, storiface.SectorFileType, bool) error `perm:"admin"`
StorageDropSector func(context.Context, stores.ID, abi.SectorID, storiface.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, storiface.SectorFileType, abi.RegisteredSealProof, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`

View File

@ -426,12 +426,21 @@ var runCmd = &cli.Command{
}
go func() {
var reconnect bool
for {
log.Info("Making sure no local tasks are running")
// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
workerApi.LocalWorker.WaitQuiet()
if reconnect {
if err := localStore.Redeclare(ctx); err != nil {
log.Errorf("Redeclaring local storage failed: %+v", err)
cancel()
return
}
}
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
@ -455,6 +464,8 @@ var runCmd = &cli.Command{
}
log.Errorf("LOTUS-MINER CONNECTION LOST")
reconnect = true
}
}()

View File

@ -178,6 +178,78 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("declaring storage in index: %w", err)
}
if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore); err != nil {
return err
}
st.paths[meta.ID] = out
return nil
}
func (st *Local) open(ctx context.Context) error {
cfg, err := st.localStorage.GetStorage()
if err != nil {
return xerrors.Errorf("getting local storage config: %w", err)
}
for _, path := range cfg.StoragePaths {
err := st.OpenPath(ctx, path.Path)
if err != nil {
return xerrors.Errorf("opening path %s: %w", path.Path, err)
}
}
go st.reportHealth(ctx)
return nil
}
func (st *Local) Redeclare(ctx context.Context) error {
st.localLk.Lock()
defer st.localLk.Unlock()
for id, p := range st.paths {
mb, err := ioutil.ReadFile(filepath.Join(p.local, MetaFile))
if err != nil {
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
}
var meta LocalStorageMeta
if err := json.Unmarshal(mb, &meta); err != nil {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
}
fst, err := p.stat(st.localStorage)
if err != nil {
return err
}
if id != meta.ID {
log.Errorf("storage path ID changed: %s; %s -> %s", p.local, id, meta.ID)
continue
}
err = st.index.StorageAttach(ctx, StorageInfo{
ID: id,
URLs: st.urls,
Weight: meta.Weight,
CanSeal: meta.CanSeal,
CanStore: meta.CanStore,
}, fst)
if err != nil {
return xerrors.Errorf("redeclaring storage in index: %w", err)
}
if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore); err != nil {
return xerrors.Errorf("redeclaring sectors: %w", err)
}
}
return nil
}
func (st *Local) declareSectors(ctx context.Context, p string, id ID, primary bool) error {
for _, t := range storiface.PathTypes {
ents, err := ioutil.ReadDir(filepath.Join(p, t.String()))
if err != nil {
@ -201,32 +273,12 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
}
if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t, meta.CanStore); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err)
if err := st.index.StorageDeclareSector(ctx, id, sid, t, primary); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, id, err)
}
}
}
st.paths[meta.ID] = out
return nil
}
func (st *Local) open(ctx context.Context) error {
cfg, err := st.localStorage.GetStorage()
if err != nil {
return xerrors.Errorf("getting local storage config: %w", err)
}
for _, path := range cfg.StoragePaths {
err := st.OpenPath(ctx, path.Path)
if err != nil {
return xerrors.Errorf("opening path %s: %w", path.Path, err)
}
}
go st.reportHealth(ctx)
return nil
}