diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 8968abf40..f80adbef7 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -105,6 +105,8 @@ var runCmd = &cli.Command{ } defer closer() ctx := lcli.ReqContext(cctx) + ctx, cancel := context.WithCancel(ctx) + defer cancel() v, err := nodeApi.Version(ctx) if err != nil { @@ -180,6 +182,13 @@ var runCmd = &cli.Command{ return xerrors.Errorf("set storage config: %w", err) } + { + // init datastore for r.Exists + _, err := lr.Datastore("/") + if err != nil { + return err + } + } if err := lr.Close(); err != nil { return xerrors.Errorf("close repo: %w", err) } @@ -240,7 +249,12 @@ var runCmd = &cli.Command{ Next: mux.ServeHTTP, } - srv := &http.Server{Handler: ah} + srv := &http.Server{ + Handler: ah, + BaseContext: func(listener net.Listener) context.Context { + return ctx + }, + } go func() { <-ctx.Done() @@ -258,6 +272,14 @@ var runCmd = &cli.Command{ log.Info("Waiting for tasks") + go func() { + if err := nodeApi.WorkerConnect(ctx, "ws://"+cctx.String("address")+"/rpc/v0"); err != nil { + log.Errorf("Registering worker failed: %+v", err) + cancel() + return + } + }() + // todo go register return srv.Serve(nl) diff --git a/lib/lotuslog/levels.go b/lib/lotuslog/levels.go index e49c46006..5f92ccc65 100644 --- a/lib/lotuslog/levels.go +++ b/lib/lotuslog/levels.go @@ -14,5 +14,7 @@ func SetupLogLevels() { logging.SetLogLevel("bitswap", "WARN") //logging.SetLogLevel("pubsub", "WARN") logging.SetLogLevel("connmgr", "WARN") + logging.SetLogLevel("advmgr", "DEBUG") + logging.SetLogLevel("stores", "DEBUG") } } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 6ec6d9acb..3227a975a 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -145,12 +145,14 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe } func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error { - _, err := advmgr.ConnectRemote(ctx, sm.Full, url) + w, err := advmgr.ConnectRemote(ctx, sm, url) if err != nil { - return err + return xerrors.Errorf("connecting remote storage failed: %w", err) } - panic("todo register ") + log.Infof("Connected to a remote worker at %s", url) + + return sm.StorageMgr.AddWorker(w) } func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index 89518ec69..87c56e8a1 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -4,6 +4,7 @@ import ( "context" "io" "net/http" + "sync" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -40,6 +41,8 @@ type Manager struct { remoteHnd *stores.FetchHandler storage2.Prover + + lk sync.Mutex } func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, urls URLs, sindex stores.SectorIndex) (*Manager, error) { @@ -59,7 +62,7 @@ func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, ur m := &Manager{ workers: []Worker{ - &LocalWorker{scfg: cfg, localStore: stor, storage: stor, sindex: sindex}, + //&LocalWorker{scfg: cfg, localStore: stor, storage: stor, sindex: sindex}, }, scfg: cfg, @@ -91,6 +94,14 @@ func (m *Manager) AddLocalStorage(path string) error { return nil } +func (m *Manager) AddWorker(w Worker) error { + m.lk.Lock() + defer m.lk.Unlock() + + m.workers = append(m.workers, w) + return nil +} + func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.remoteHnd.ServeHTTP(w, r) } @@ -115,6 +126,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor continue } if _, ok := tt[task]; !ok { + log.Debugf("dropping worker %d; task %s not supported (supports %v)", i, task, tt) continue } @@ -139,6 +151,8 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor } } if st == nil { + log.Debugf("skipping worker %d; doesn't have any of %v", i, inPaths) + log.Debugf("skipping worker %d; only has %v", i, phs) continue } @@ -168,6 +182,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err) } + log.Debugf("find workers for %v", best) candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best) if len(candidateWorkers) == 0 { diff --git a/storage/sealmgr/advmgr/local.go b/storage/sealmgr/advmgr/worker_local.go similarity index 100% rename from storage/sealmgr/advmgr/local.go rename to storage/sealmgr/advmgr/worker_local.go diff --git a/storage/sealmgr/advmgr/remote.go b/storage/sealmgr/advmgr/worker_remote.go similarity index 92% rename from storage/sealmgr/advmgr/remote.go rename to storage/sealmgr/advmgr/worker_remote.go index 27699434e..734e14ebc 100644 --- a/storage/sealmgr/advmgr/remote.go +++ b/storage/sealmgr/advmgr/worker_remote.go @@ -24,7 +24,7 @@ func (r *remote) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes [ return abi.PieceInfo{}, xerrors.New("unsupported") } -func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) { +func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, error) { token, err := fa.AuthNew(ctx, []api.Permission{"admin"}) if err != nil { return nil, xerrors.Errorf("creating auth token for remote connection: %w", err) diff --git a/storage/sealmgr/stores/local.go b/storage/sealmgr/stores/local.go index 4b9a0ed71..4cf3cf565 100644 --- a/storage/sealmgr/stores/local.go +++ b/storage/sealmgr/stores/local.go @@ -212,13 +212,18 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s } func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) { + st.localLk.RLock() + defer st.localLk.RUnlock() + var out []StorageMeta for _, p := range st.paths { if sealing && !p.meta.CanSeal { + log.Debugf("alloc: not considering %s; can't seal", p.meta.ID) continue } if !sealing && !p.meta.CanStore { + log.Debugf("alloc: not considering %s; can't store", p.meta.ID) continue } @@ -236,6 +241,9 @@ func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sea } func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) { + st.localLk.RLock() + defer st.localLk.RUnlock() + var out []StorageMeta for _, p := range st.paths { p.lk.Lock() @@ -254,6 +262,9 @@ func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ( } func (st *Local) Local() []StoragePath { + st.localLk.RLock() + defer st.localLk.RUnlock() + var out []StoragePath for _, p := range st.paths { if p.local == "" {