workers: Actually register in miner
This commit is contained in:
parent
d8c8832a37
commit
94ebbd9d90
@ -105,6 +105,8 @@ var runCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
defer closer()
|
defer closer()
|
||||||
ctx := lcli.ReqContext(cctx)
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
v, err := nodeApi.Version(ctx)
|
v, err := nodeApi.Version(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -180,6 +182,13 @@ var runCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("set storage config: %w", err)
|
return xerrors.Errorf("set storage config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// init datastore for r.Exists
|
||||||
|
_, err := lr.Datastore("/")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := lr.Close(); err != nil {
|
if err := lr.Close(); err != nil {
|
||||||
return xerrors.Errorf("close repo: %w", err)
|
return xerrors.Errorf("close repo: %w", err)
|
||||||
}
|
}
|
||||||
@ -240,7 +249,12 @@ var runCmd = &cli.Command{
|
|||||||
Next: mux.ServeHTTP,
|
Next: mux.ServeHTTP,
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := &http.Server{Handler: ah}
|
srv := &http.Server{
|
||||||
|
Handler: ah,
|
||||||
|
BaseContext: func(listener net.Listener) context.Context {
|
||||||
|
return ctx
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
@ -258,6 +272,14 @@ var runCmd = &cli.Command{
|
|||||||
|
|
||||||
log.Info("Waiting for tasks")
|
log.Info("Waiting for tasks")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := nodeApi.WorkerConnect(ctx, "ws://"+cctx.String("address")+"/rpc/v0"); err != nil {
|
||||||
|
log.Errorf("Registering worker failed: %+v", err)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// todo go register
|
// todo go register
|
||||||
|
|
||||||
return srv.Serve(nl)
|
return srv.Serve(nl)
|
||||||
|
@ -14,5 +14,7 @@ func SetupLogLevels() {
|
|||||||
logging.SetLogLevel("bitswap", "WARN")
|
logging.SetLogLevel("bitswap", "WARN")
|
||||||
//logging.SetLogLevel("pubsub", "WARN")
|
//logging.SetLogLevel("pubsub", "WARN")
|
||||||
logging.SetLogLevel("connmgr", "WARN")
|
logging.SetLogLevel("connmgr", "WARN")
|
||||||
|
logging.SetLogLevel("advmgr", "DEBUG")
|
||||||
|
logging.SetLogLevel("stores", "DEBUG")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -145,12 +145,14 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
|
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
|
||||||
_, err := advmgr.ConnectRemote(ctx, sm.Full, url)
|
w, err := advmgr.ConnectRemote(ctx, sm, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return xerrors.Errorf("connecting remote storage failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
panic("todo register ")
|
log.Infof("Connected to a remote worker at %s", url)
|
||||||
|
|
||||||
|
return sm.StorageMgr.AddWorker(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -40,6 +41,8 @@ type Manager struct {
|
|||||||
remoteHnd *stores.FetchHandler
|
remoteHnd *stores.FetchHandler
|
||||||
|
|
||||||
storage2.Prover
|
storage2.Prover
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, urls URLs, sindex stores.SectorIndex) (*Manager, error) {
|
func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, urls URLs, sindex stores.SectorIndex) (*Manager, error) {
|
||||||
@ -59,7 +62,7 @@ func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, ur
|
|||||||
|
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
workers: []Worker{
|
workers: []Worker{
|
||||||
&LocalWorker{scfg: cfg, localStore: stor, storage: stor, sindex: sindex},
|
//&LocalWorker{scfg: cfg, localStore: stor, storage: stor, sindex: sindex},
|
||||||
},
|
},
|
||||||
scfg: cfg,
|
scfg: cfg,
|
||||||
|
|
||||||
@ -91,6 +94,14 @@ func (m *Manager) AddLocalStorage(path string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) AddWorker(w Worker) error {
|
||||||
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
|
m.workers = append(m.workers, w)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
m.remoteHnd.ServeHTTP(w, r)
|
m.remoteHnd.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
@ -115,6 +126,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, ok := tt[task]; !ok {
|
if _, ok := tt[task]; !ok {
|
||||||
|
log.Debugf("dropping worker %d; task %s not supported (supports %v)", i, task, tt)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,6 +151,8 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if st == nil {
|
if st == nil {
|
||||||
|
log.Debugf("skipping worker %d; doesn't have any of %v", i, inPaths)
|
||||||
|
log.Debugf("skipping worker %d; only has %v", i, phs)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,6 +182,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
|
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debugf("find workers for %v", best)
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best)
|
||||||
|
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
|
@ -24,7 +24,7 @@ func (r *remote) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes [
|
|||||||
return abi.PieceInfo{}, xerrors.New("unsupported")
|
return abi.PieceInfo{}, xerrors.New("unsupported")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) {
|
func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, error) {
|
||||||
token, err := fa.AuthNew(ctx, []api.Permission{"admin"})
|
token, err := fa.AuthNew(ctx, []api.Permission{"admin"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("creating auth token for remote connection: %w", err)
|
return nil, xerrors.Errorf("creating auth token for remote connection: %w", err)
|
@ -212,13 +212,18 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) {
|
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageMeta, error) {
|
||||||
|
st.localLk.RLock()
|
||||||
|
defer st.localLk.RUnlock()
|
||||||
|
|
||||||
var out []StorageMeta
|
var out []StorageMeta
|
||||||
|
|
||||||
for _, p := range st.paths {
|
for _, p := range st.paths {
|
||||||
if sealing && !p.meta.CanSeal {
|
if sealing && !p.meta.CanSeal {
|
||||||
|
log.Debugf("alloc: not considering %s; can't seal", p.meta.ID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !sealing && !p.meta.CanStore {
|
if !sealing && !p.meta.CanStore {
|
||||||
|
log.Debugf("alloc: not considering %s; can't store", p.meta.ID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,6 +241,9 @@ func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sea
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
|
func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
|
||||||
|
st.localLk.RLock()
|
||||||
|
defer st.localLk.RUnlock()
|
||||||
|
|
||||||
var out []StorageMeta
|
var out []StorageMeta
|
||||||
for _, p := range st.paths {
|
for _, p := range st.paths {
|
||||||
p.lk.Lock()
|
p.lk.Lock()
|
||||||
@ -254,6 +262,9 @@ func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) Local() []StoragePath {
|
func (st *Local) Local() []StoragePath {
|
||||||
|
st.localLk.RLock()
|
||||||
|
defer st.localLk.RUnlock()
|
||||||
|
|
||||||
var out []StoragePath
|
var out []StoragePath
|
||||||
for _, p := range st.paths {
|
for _, p := range st.paths {
|
||||||
if p.local == "" {
|
if p.local == "" {
|
||||||
|
Loading…
Reference in New Issue
Block a user