diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index e3a7128a6..81f9b1514 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -443,19 +443,17 @@ def test_multiple_raylets(ray_start_cluster): def test_custom_resources(ray_start_cluster): cluster = ray_start_cluster - cluster.add_node(num_cpus=3, resources={"CustomResource": 0}) + cluster.add_node(num_cpus=1, resources={"CustomResource": 0}) custom_resource_node = cluster.add_node( - num_cpus=3, resources={"CustomResource": 1}) + num_cpus=1, resources={"CustomResource": 1}) ray.init(address=cluster.address) @ray.remote def f(): - time.sleep(0.001) return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource": 1}) def g(): - time.sleep(0.001) return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource": 1}) @@ -463,9 +461,6 @@ def test_custom_resources(ray_start_cluster): ray.get([f.remote() for _ in range(5)]) return ray.worker.global_worker.node.unique_id - # The f tasks should be scheduled on both raylets. - assert len(set(ray.get([f.remote() for _ in range(500)]))) == 2 - # The g tasks should be scheduled only on the second raylet. raylet_ids = set(ray.get([g.remote() for _ in range(50)])) assert len(raylet_ids) == 1 diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 089dd1671..aae6bc4aa 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -2,11 +2,9 @@ import glob import logging import os -import shutil import json import sys import socket -import tempfile import time import numpy as np @@ -416,24 +414,6 @@ def test_ray_stack(ray_start_2_cpus): "'ray stack'") -def test_pandas_parquet_serialization(): - # Only test this if pandas is installed - pytest.importorskip("pandas") - - import pandas as pd - import pyarrow as pa - import pyarrow.parquet as pq - - tempdir = tempfile.mkdtemp() - filename = os.path.join(tempdir, "parquet-test") - pd.DataFrame({"col1": [0, 1], "col2": [0, 1]}).to_parquet(filename) - with open(os.path.join(tempdir, "parquet-compression"), "wb") as f: - table = pa.Table.from_arrays([pa.array([1, 2, 3])], ["hello"]) - pq.write_table(table, f, compression="lz4") - # Clean up - shutil.rmtree(tempdir) - - def test_socket_dir_not_existing(shutdown_only): if sys.platform != "win32": random_name = ray.ObjectRef.from_random().hex() diff --git a/python/ray/tests/test_dynres.py b/python/ray/tests/test_dynres.py index dda0872f8..1470f8919 100644 --- a/python/ray/tests/test_dynres.py +++ b/python/ray/tests/test_dynres.py @@ -286,14 +286,15 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster): target_node_id = node_ids[1] ray.get(set_res.remote(res_name, res_capacity, target_node_id)) - def check_resources(): - return ray.cluster_resources().get(res_name, None) == res_capacity - - wait_for_condition(check_resources) + wait_for_condition( + lambda: ray.cluster_resources().get(res_name, None) == res_capacity) # Delete the resource ray.get(delete_res.remote(res_name, target_node_id)) + wait_for_condition( + lambda: ray.cluster_resources().get(res_name, None) is None) + # Define a task which requires this resource. This should not run @ray.remote(resources={res_name: res_capacity}) def test_func(): diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index 29cfecfcb..5b0e986bf 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -6,11 +6,12 @@ import ray def test_output(): + # Use subprocess to execute the __main__ below. outputs = subprocess.check_output( [sys.executable, __file__, "_ray_instance"], stderr=subprocess.STDOUT).decode() lines = outputs.split("\n") - assert len(lines) == 3 + assert len(lines) == 3, lines logging_header = r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\sINFO\s" assert re.match( logging_header + r"resource_spec.py:\d+ -- Starting Ray with [0-9\.]+ " @@ -24,7 +25,7 @@ def test_output(): if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "_ray_instance": - ray.init() + ray.init(num_cpus=1) ray.shutdown() else: sys.exit(pytest.main(["-v", __file__]))