support unspecified addresses for worker
allow operators to set an unspecified address - 0.0.0.0 - when setting address flag in 'lotus-worker run' to extract worker ip, dial the miner api. if the dial succeeds, a valid route between miner and worker can be inferred
This commit is contained in:
parent
694286b30d
commit
36472802f7
@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@ -259,8 +260,9 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
|
||||
log.Info("Opening local storage; connecting to master")
|
||||
address := cctx.String("address")
|
||||
|
||||
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + cctx.String("address") + "/remote"})
|
||||
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -289,7 +291,7 @@ var runCmd = &cli.Command{
|
||||
|
||||
mux := mux.NewRouter()
|
||||
|
||||
log.Info("Setting up control endpoint at " + cctx.String("address"))
|
||||
log.Info("Setting up control endpoint at " + address)
|
||||
|
||||
rpcServer := jsonrpc.NewServer()
|
||||
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
|
||||
@ -319,7 +321,7 @@ var runCmd = &cli.Command{
|
||||
log.Warn("Graceful shutdown successful")
|
||||
}()
|
||||
|
||||
nl, err := net.Listen("tcp", cctx.String("address"))
|
||||
nl, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -327,7 +329,19 @@ var runCmd = &cli.Command{
|
||||
log.Info("Waiting for tasks")
|
||||
|
||||
go func() {
|
||||
if err := nodeApi.WorkerConnect(ctx, "ws://"+cctx.String("address")+"/rpc/v0"); err != nil {
|
||||
const unspecifiedAddress = "0.0.0.0"
|
||||
addressSlice := strings.Split(address, ":")
|
||||
if ip := net.ParseIP(addressSlice[0]); ip != nil {
|
||||
if ip.String() == unspecifiedAddress {
|
||||
rip, err := extractRoutableIP()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
address = rip + ":" + addressSlice[1]
|
||||
}
|
||||
}
|
||||
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
|
||||
log.Errorf("Registering worker failed: %+v", err)
|
||||
cancel()
|
||||
return
|
||||
@ -376,3 +390,27 @@ func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageM
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func extractRoutableIP() (string, error) {
|
||||
minerMultiAddrKey := "MINER_API_INFO"
|
||||
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
|
||||
env, ok := os.LookupEnv(minerMultiAddrKey)
|
||||
if !ok {
|
||||
// TODO remove after deprecation period
|
||||
env, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
|
||||
if ok {
|
||||
log.Warnf("Use deprecation env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
|
||||
}
|
||||
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
|
||||
}
|
||||
minerAddr := strings.Split(env, "/")
|
||||
conn, err := net.Dial("tcp", minerAddr[2]+":"+minerAddr[4])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
localAddr := conn.LocalAddr().(*net.TCPAddr)
|
||||
|
||||
return strings.Split(localAddr.IP.String(), ":")[0], nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user