diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index 7aedacba1..5bd3a74e5 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -127,11 +127,11 @@ class AWSNodeProvider(NodeProvider): return [node.id for node in nodes] def is_running(self, node_id): - node = self._node(node_id) + node = self._get_cached_node(node_id) return node.state["Name"] == "running" def is_terminated(self, node_id): - node = self._node(node_id) + node = self._get_cached_node(node_id) state = node.state["Name"] return state not in ["running", "pending"] @@ -142,10 +142,20 @@ class AWSNodeProvider(NodeProvider): return dict(d1, **d2) def external_ip(self, node_id): - return self._node(node_id).public_ip_address + node = self._get_cached_node(node_id) + + if node.public_ip_address is None: + node = self._get_node(node_id) + + return node.public_ip_address def internal_ip(self, node_id): - return self._node(node_id).private_ip_address + node = self._get_cached_node(node_id) + + if node.private_ip_address is None: + node = self._get_node(node_id) + + return node.private_ip_address def set_node_tags(self, node_id, tags): with self.tag_cache_lock: @@ -205,7 +215,7 @@ class AWSNodeProvider(NodeProvider): self.ec2.create_instances(**conf) def terminate_node(self, node_id): - node = self._node(node_id) + node = self._get_cached_node(node_id) node.terminate() self.tag_cache.pop(node_id, None) @@ -218,14 +228,22 @@ class AWSNodeProvider(NodeProvider): self.tag_cache.pop(node_id, None) self.tag_cache_pending.pop(node_id, None) - def _node(self, node_id): - if node_id not in self.cached_nodes: - self.nodes({}) # Side effect: should cache it. + def _get_node(self, node_id): + """Refresh and get info for this node, updating the cache.""" + self.nodes({}) # Side effect: fetches and caches the node. assert node_id in self.cached_nodes, "Invalid instance id {}".format( node_id) + return self.cached_nodes[node_id] + def _get_cached_node(self, node_id): + """Return node info from cache if possible, otherwise fetches it.""" + if node_id in self.cached_nodes: + return self.cached_nodes[node_id] + + return self._get_node(node_id) + def cleanup(self): self.tag_cache_update_event.set() self.tag_cache_kill_event.set() diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index 7b3d2b444..c505cfabe 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -61,7 +61,7 @@ head_node: initializeParams: diskSizeGb: 50 # See https://cloud.google.com/compute/docs/images for more images - sourceImage: projects/ubuntu-os-cloud/global/images/family/ubuntu-1604-lts # Ubuntu + sourceImage: projects/ml-images/global/images/family/tf-1-12-gpu # Additional options can be found in in the compute docs at # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert @@ -84,7 +84,7 @@ worker_nodes: initializeParams: diskSizeGb: 50 # See https://cloud.google.com/compute/docs/images for more images - sourceImage: projects/ubuntu-os-cloud/global/images/family/ubuntu-1604-lts # Ubuntu + sourceImage: projects/ml-images/global/images/family/tf-1-12-gpu # Run workers on preemtible instance by default. # Comment this out to use on-demand. scheduling: @@ -102,54 +102,33 @@ file_mounts: { # List of shell commands to run to set up nodes. setup_commands: - # Consider uncommenting these if you also want to run apt-get commands during setup - # - sudo pkill -9 apt-get || true - # - sudo pkill -9 dpkg || true - # - sudo dpkg --configure -a + # Note: if you're developing Ray, you probably want to create an AMI that + # has your Ray repo pre-cloned. Then, you can replace the pip installs + # below with a git checkout (and possibly a recompile). + # - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc - # Install basics. - - sudo apt-get update - >- - sudo apt-get install -y - cmake - pkg-config - build-essential - autoconf - curl - libtool - unzip - flex - bison - python + sudo apt-get update + && sudo apt-get install -y + psmisc + # Install Anaconda. - >- wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh -O ~/anaconda3.sh || true - - bash ~/anaconda3.sh -b -p ~/anaconda3 || true - - rm ~/anaconda3.sh - - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc + && bash ~/anaconda3.sh -b -p ~/anaconda3 || true + && rm ~/anaconda3.sh + && echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.profile - # Build Ray. - # Note: if you're developing Ray, you probably want to create a boot-disk - # that has your Ray repo pre-cloned. Then, you can replace the pip installs - # below with a git checkout (and possibly a recompile). - - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc - - >- - pip install - google-api-python-client==1.6.7 - cython==0.29.0 + # Install ray # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.6.2-cp27-cp27mu-manylinux1_x86_64.whl # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.6.2-cp35-cp35m-manylinux1_x86_64.whl - # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.6.2-cp36-cp36m-manylinux1_x86_64.whl - - >- - cd ~ - && git clone https://github.com/ray-project/ray || true - - >- - cd ~/ray/python - && pip install -e . --verbose + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.6.2-cp36-cp36m-manylinux1_x86_64.whl + # Custom commands that will be run on the head node after common setup. -head_setup_commands: [] +head_setup_commands: + - pip install google-api-python-client==1.7.8 # Custom commands that will be run on worker nodes after common setup. worker_setup_commands: [] diff --git a/python/ray/autoscaler/gcp/node_provider.py b/python/ray/autoscaler/gcp/node_provider.py index b31ab1af6..787b9d8e1 100644 --- a/python/ray/autoscaler/gcp/node_provider.py +++ b/python/ray/autoscaler/gcp/node_provider.py @@ -51,10 +51,6 @@ class GCPNodeProvider(NodeProvider): # excessive DescribeInstances requests. self.cached_nodes = {} - # Cache of ip lookups. We assume IPs never change once assigned. - self.internal_ip_cache = {} - self.external_ip_cache = {} - def nodes(self, tag_filters): if tag_filters: label_filter_expr = "(" + " AND ".join([ @@ -97,15 +93,15 @@ class GCPNodeProvider(NodeProvider): return [i["name"] for i in instances] def is_running(self, node_id): - node = self._node(node_id) + node = self._get_cached_node(node_id) return node["status"] == "RUNNING" def is_terminated(self, node_id): - node = self._node(node_id) + node = self._get_cached_node(node_id) return node["status"] not in {"PROVISIONING", "STAGING", "RUNNING"} def node_tags(self, node_id): - node = self._node(node_id) + node = self._get_cached_node(node_id) labels = node.get("labels", {}) return labels @@ -114,7 +110,7 @@ class GCPNodeProvider(NodeProvider): project_id = self.provider_config["project_id"] availability_zone = self.provider_config["availability_zone"] - node = self._node(node_id) + node = self._get_node(node_id) operation = self.compute.instances().setLabels( project=project_id, zone=availability_zone, @@ -130,23 +126,30 @@ class GCPNodeProvider(NodeProvider): return result def external_ip(self, node_id): - if node_id in self.external_ip_cache: - return self.external_ip_cache[node_id] - node = self._node(node_id) - # TODO: Is there a better and more reliable way to do this? - ip = (node.get("networkInterfaces", [{}])[0].get( - "accessConfigs", [{}])[0].get("natIP", None)) - if ip: - self.external_ip_cache[node_id] = ip + node = self._get_cached_node(node_id) + + def get_external_ip(node): + return node.get("networkInterfaces", [{}])[0].get( + "accessConfigs", [{}])[0].get("natIP", None) + + ip = get_external_ip(node) + if ip is None: + node = self._get_node(node_id) + ip = get_external_ip(node) + return ip def internal_ip(self, node_id): - if node_id in self.internal_ip_cache: - return self.internal_ip_cache[node_id] - node = self._node(node_id) - ip = node.get("networkInterfaces", [{}])[0].get("networkIP") - if ip: - self.internal_ip_cache[node_id] = ip + node = self._get_cached_node(node_id) + + def get_internal_ip(node): + return node.get("networkInterfaces", [{}])[0].get("networkIP") + + ip = get_internal_ip(node) + if ip is None: + node = self._get_node(node_id) + ip = get_internal_ip(node) + return ip def create_node(self, base_config, tags, count): @@ -206,14 +209,16 @@ class GCPNodeProvider(NodeProvider): return result - def _node(self, node_id): + def _get_node(self, node_id): + self.nodes({}) # Side effect: fetches and caches the node. + + assert node_id in self.cached_nodes, "Invalid instance id {}".format( + node_id) + + return self.cached_nodes[node_id] + + def _get_cached_node(self, node_id): if node_id in self.cached_nodes: return self.cached_nodes[node_id] - instance = self.compute.instances().get( - project=self.provider_config["project_id"], - zone=self.provider_config["availability_zone"], - instance=node_id, - ).execute() - - return instance + return self._get_node(node_id)