diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 2ccdb4be2..97980a641 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -48,6 +48,7 @@ py_test_module_list( "test_metrics.py", "test_multi_node.py", "test_multi_node_2.py", + "test_multi_node_3.py", "test_multi_tenancy.py", "test_multinode_failures.py", "test_multinode_failures_2.py", diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index fbce475c1..ae9ae1c1e 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -1,15 +1,13 @@ import os import pytest -import subprocess import sys import time import ray -from ray.test_utils import ( - RayTestTimeoutException, check_call_ray, run_string_as_driver, - run_string_as_driver_nonblocking, wait_for_children_of_pid, - wait_for_children_of_pid_to_exit, wait_for_condition, kill_process_by_name, - Semaphore, init_error_pubsub, get_error_message) +from ray.test_utils import (RayTestTimeoutException, run_string_as_driver, + run_string_as_driver_nonblocking, + wait_for_condition, init_error_pubsub, + get_error_message) def test_remote_raylet_cleanup(ray_start_cluster): @@ -368,385 +366,6 @@ print("success") process_handle.kill() -def test_calling_start_ray_head(call_ray_stop_only): - - # Test that we can call ray start with various command line - # parameters. TODO(rkn): This test only tests the --head code path. We - # should also test the non-head node code path. - - # Test starting Ray with a redis port specified. - check_call_ray(["start", "--head", "--port", "0"]) - check_call_ray(["stop"]) - - # Test starting Ray with a node IP address specified. - check_call_ray( - ["start", "--head", "--node-ip-address", "127.0.0.1", "--port", "0"]) - check_call_ray(["stop"]) - - # Test starting Ray with a system config parameter set. - check_call_ray([ - "start", "--head", "--system-config", - "{\"metrics_report_interval_ms\":100}", "--port", "0" - ]) - check_call_ray(["stop"]) - - # Test starting Ray with the object manager and node manager ports - # specified. - check_call_ray([ - "start", "--head", "--object-manager-port", "12345", - "--node-manager-port", "54321", "--port", "0" - ]) - check_call_ray(["stop"]) - - # Test starting Ray with the worker port range specified. - check_call_ray([ - "start", "--head", "--min-worker-port", "50000", "--max-worker-port", - "51000", "--port", "0" - ]) - check_call_ray(["stop"]) - - # Test starting Ray with a worker port list. - check_call_ray(["start", "--head", "--worker-port-list", "10000,10001"]) - check_call_ray(["stop"]) - - # Test starting Ray with a non-int in the worker port list. - with pytest.raises(subprocess.CalledProcessError): - check_call_ray(["start", "--head", "--worker-port-list", "10000,a"]) - check_call_ray(["stop"]) - - # Test starting Ray with an invalid port in the worker port list. - with pytest.raises(subprocess.CalledProcessError): - check_call_ray(["start", "--head", "--worker-port-list", "100"]) - check_call_ray(["stop"]) - - # Test starting Ray with the number of CPUs specified. - check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"]) - check_call_ray(["stop"]) - - # Test starting Ray with the number of GPUs specified. - check_call_ray(["start", "--head", "--num-gpus", "100", "--port", "0"]) - check_call_ray(["stop"]) - - # Test starting Ray with redis shard ports specified. - check_call_ray([ - "start", "--head", "--redis-shard-ports", "6380,6381,6382", "--port", - "0" - ]) - check_call_ray(["stop"]) - - # Test starting Ray with all arguments specified. - check_call_ray([ - "start", "--head", "--redis-shard-ports", "6380,6381,6382", - "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", - "--resources", "{\"Custom\": 1}", "--port", "0" - ]) - check_call_ray(["stop"]) - - # Test starting Ray with invalid arguments. - with pytest.raises(subprocess.CalledProcessError): - check_call_ray( - ["start", "--head", "--address", "127.0.0.1:6379", "--port", "0"]) - check_call_ray(["stop"]) - - # Test --block. Killing a child process should cause the command to exit. - blocked = subprocess.Popen( - ["ray", "start", "--head", "--block", "--port", "0"]) - - wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30) - - blocked.poll() - assert blocked.returncode is None - - kill_process_by_name("raylet") - wait_for_children_of_pid_to_exit(blocked.pid, timeout=30) - blocked.wait() - assert blocked.returncode != 0, "ray start shouldn't return 0 on bad exit" - - # Test --block. Killing the command should clean up all child processes. - blocked = subprocess.Popen( - ["ray", "start", "--head", "--block", "--port", "0"]) - blocked.poll() - assert blocked.returncode is None - - wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30) - - blocked.terminate() - wait_for_children_of_pid_to_exit(blocked.pid, timeout=30) - blocked.wait() - assert blocked.returncode != 0, "ray start shouldn't return 0 on bad exit" - - -@pytest.mark.parametrize( - "call_ray_start", - ["ray start --head --num-cpus=1 " + "--node-ip-address=localhost"], - indirect=True) -def test_using_hostnames(call_ray_start): - ray.init(_node_ip_address="localhost", address="localhost:6379") - - @ray.remote - def f(): - return 1 - - assert ray.get(f.remote()) == 1 - - -def test_connecting_in_local_case(ray_start_regular): - address_info = ray_start_regular - - # Define a driver that just connects to Redis. - driver_script = """ -import ray -ray.init(address="{}") -print("success") -""".format(address_info["redis_address"]) - - out = run_string_as_driver(driver_script) - # Make sure the other driver succeeded. - assert "success" in out - - -def test_run_driver_twice(ray_start_regular): - # We used to have issue 2165 and 2288: - # https://github.com/ray-project/ray/issues/2165 - # https://github.com/ray-project/ray/issues/2288 - # both complain that driver will hang when run for the second time. - # This test is used to verify the fix for above issue, it will run the - # same driver for twice and verify whether both of them succeed. - address_info = ray_start_regular - driver_script = """ -import ray -import ray.tune as tune -import os -import time - -def train_func(config, reporter): # add a reporter arg - for i in range(2): - time.sleep(0.1) - reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics - -os.environ["TUNE_RESUME_PROMPT_OFF"] = "True" -ray.init(address="{}") -ray.tune.register_trainable("train_func", train_func) - -tune.run_experiments({{ - "my_experiment": {{ - "run": "train_func", - "stop": {{"mean_accuracy": 99}}, - "config": {{ - "layer1": {{ - "class_name": tune.grid_search(["a"]), - "config": {{"lr": tune.grid_search([1, 2])}} - }}, - }}, - "local_dir": os.path.expanduser("~/tmp") - }} -}}) -print("success") -""".format(address_info["redis_address"]) - - for i in range(2): - out = run_string_as_driver(driver_script) - assert "success" in out - - -@pytest.mark.skip(reason="fate sharing not implemented yet") -def test_driver_exiting_when_worker_blocked(call_ray_start): - # This test will create some drivers that submit some tasks and then - # exit without waiting for the tasks to complete. - address = call_ray_start - - ray.init(address=address) - - # Define a driver that creates two tasks, one that runs forever and the - # other blocked on the first in a `ray.get`. - driver_script = """ -import time -import ray -ray.init(address="{}") -@ray.remote -def f(): - time.sleep(10**6) -@ray.remote -def g(): - ray.get(f.remote()) -g.remote() -time.sleep(1) -print("success") -""".format(address) - - # Create some drivers and let them exit and make sure everything is - # still alive. - for _ in range(3): - out = run_string_as_driver(driver_script) - # Make sure the first driver ran to completion. - assert "success" in out - - # Define a driver that creates two tasks, one that runs forever and the - # other blocked on the first in a `ray.wait`. - driver_script = """ -import time -import ray -ray.init(address="{}") -@ray.remote -def f(): - time.sleep(10**6) -@ray.remote -def g(): - ray.wait([f.remote()]) -g.remote() -time.sleep(1) -print("success") -""".format(address) - - # Create some drivers and let them exit and make sure everything is - # still alive. - for _ in range(3): - out = run_string_as_driver(driver_script) - # Make sure the first driver ran to completion. - assert "success" in out - - # Define a driver that creates one task that depends on a nonexistent - # object. This task will be queued as waiting to execute. - driver_script_template = """ -import time -import ray -ray.init(address="{}") -@ray.remote -def g(x): - return -g.remote(ray.ObjectRef(ray.utils.hex_to_binary("{}"))) -time.sleep(1) -print("success") -""" - - # Create some drivers and let them exit and make sure everything is - # still alive. - for _ in range(3): - nonexistent_id = ray.ObjectRef.from_random() - driver_script = driver_script_template.format(address, - nonexistent_id.hex()) - out = run_string_as_driver(driver_script) - # Simulate the nonexistent dependency becoming available. - ray.worker.global_worker.put_object(None, nonexistent_id) - # Make sure the first driver ran to completion. - assert "success" in out - - # Define a driver that calls `ray.wait` on a nonexistent object. - driver_script_template = """ -import time -import ray -ray.init(address="{}") -@ray.remote -def g(): - ray.wait(ray.ObjectRef(ray.utils.hex_to_binary("{}"))) -g.remote() -time.sleep(1) -print("success") -""" - - # Create some drivers and let them exit and make sure everything is - # still alive. - for _ in range(3): - nonexistent_id = ray.ObjectRef.from_random() - driver_script = driver_script_template.format(address, - nonexistent_id.hex()) - out = run_string_as_driver(driver_script) - # Simulate the nonexistent dependency becoming available. - ray.worker.global_worker.put_object(None, nonexistent_id) - # Make sure the first driver ran to completion. - assert "success" in out - - @ray.remote - def f(): - return 1 - - # Make sure we can still talk with the raylet. - ray.get(f.remote()) - - -def test_multi_driver_logging(ray_start_regular): - address_info = ray_start_regular - address = address_info["redis_address"] - - # ray.init(address=address) - driver1_wait = Semaphore.options(name="driver1_wait").remote(value=0) - driver2_wait = Semaphore.options(name="driver2_wait").remote(value=0) - main_wait = Semaphore.options(name="main_wait").remote(value=0) - - # The creation of an actor is asynchronous. - # We need to wait for the completion of the actor creation, - # otherwise we can't get the actor by name. - ray.get(driver1_wait.locked.remote()) - ray.get(driver2_wait.locked.remote()) - ray.get(main_wait.locked.remote()) - - # Params are address, semaphore name, output1, output2 - driver_script_template = """ -import ray -import sys -from ray.test_utils import Semaphore - -@ray.remote(num_cpus=0) -def remote_print(s, file=None): - print(s, file=file) - -ray.init(address="{}") - -driver_wait = ray.get_actor("{}") -main_wait = ray.get_actor("main_wait") - -ray.get(main_wait.release.remote()) -ray.get(driver_wait.acquire.remote()) - -s1 = "{}" -ray.get(remote_print.remote(s1)) - -ray.get(main_wait.release.remote()) -ray.get(driver_wait.acquire.remote()) - -s2 = "{}" -ray.get(remote_print.remote(s2)) - -ray.get(main_wait.release.remote()) - """ - - p1 = run_string_as_driver_nonblocking( - driver_script_template.format(address, "driver1_wait", "1", "2")) - p2 = run_string_as_driver_nonblocking( - driver_script_template.format(address, "driver2_wait", "3", "4")) - - ray.get(main_wait.acquire.remote()) - ray.get(main_wait.acquire.remote()) - # At this point both of the other drivers are fully initialized. - - ray.get(driver1_wait.release.remote()) - ray.get(driver2_wait.release.remote()) - - # At this point driver1 should receive '1' and driver2 '3' - ray.get(main_wait.acquire.remote()) - ray.get(main_wait.acquire.remote()) - - ray.get(driver1_wait.release.remote()) - ray.get(driver2_wait.release.remote()) - - # At this point driver1 should receive '2' and driver2 '4' - ray.get(main_wait.acquire.remote()) - ray.get(main_wait.acquire.remote()) - - driver1_out = p1.stdout.read().decode("ascii") - driver2_out = p2.stdout.read().decode("ascii") - if sys.platform == "win32": - driver1_out = driver1_out.replace("\r", "") - driver2_out = driver2_out.replace("\r", "") - driver1_out_split = driver1_out.split("\n") - driver2_out_split = driver2_out.split("\n") - - assert driver1_out_split[0][-1] == "1", driver1_out_split - assert driver1_out_split[1][-1] == "2", driver1_out_split - assert driver2_out_split[0][-1] == "3", driver2_out_split - assert driver2_out_split[1][-1] == "4", driver2_out_split - - if __name__ == "__main__": import pytest # Make subprocess happy in bazel. diff --git a/python/ray/tests/test_multi_node_3.py b/python/ray/tests/test_multi_node_3.py new file mode 100644 index 000000000..9c270b64d --- /dev/null +++ b/python/ray/tests/test_multi_node_3.py @@ -0,0 +1,397 @@ +import os +import pytest +import subprocess +import sys + +import ray +from ray.test_utils import ( + check_call_ray, run_string_as_driver, run_string_as_driver_nonblocking, + wait_for_children_of_pid, wait_for_children_of_pid_to_exit, + kill_process_by_name, Semaphore) + + +def test_calling_start_ray_head(call_ray_stop_only): + + # Test that we can call ray start with various command line + # parameters. TODO(rkn): This test only tests the --head code path. We + # should also test the non-head node code path. + + # Test starting Ray with a redis port specified. + check_call_ray(["start", "--head", "--port", "0"]) + check_call_ray(["stop"]) + + # Test starting Ray with a node IP address specified. + check_call_ray( + ["start", "--head", "--node-ip-address", "127.0.0.1", "--port", "0"]) + check_call_ray(["stop"]) + + # Test starting Ray with a system config parameter set. + check_call_ray([ + "start", "--head", "--system-config", + "{\"metrics_report_interval_ms\":100}", "--port", "0" + ]) + check_call_ray(["stop"]) + + # Test starting Ray with the object manager and node manager ports + # specified. + check_call_ray([ + "start", "--head", "--object-manager-port", "12345", + "--node-manager-port", "54321", "--port", "0" + ]) + check_call_ray(["stop"]) + + # Test starting Ray with the worker port range specified. + check_call_ray([ + "start", "--head", "--min-worker-port", "50000", "--max-worker-port", + "51000", "--port", "0" + ]) + check_call_ray(["stop"]) + + # Test starting Ray with a worker port list. + check_call_ray(["start", "--head", "--worker-port-list", "10000,10001"]) + check_call_ray(["stop"]) + + # Test starting Ray with a non-int in the worker port list. + with pytest.raises(subprocess.CalledProcessError): + check_call_ray(["start", "--head", "--worker-port-list", "10000,a"]) + check_call_ray(["stop"]) + + # Test starting Ray with an invalid port in the worker port list. + with pytest.raises(subprocess.CalledProcessError): + check_call_ray(["start", "--head", "--worker-port-list", "100"]) + check_call_ray(["stop"]) + + # Test starting Ray with the number of CPUs specified. + check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"]) + check_call_ray(["stop"]) + + # Test starting Ray with the number of GPUs specified. + check_call_ray(["start", "--head", "--num-gpus", "100", "--port", "0"]) + check_call_ray(["stop"]) + + # Test starting Ray with redis shard ports specified. + check_call_ray([ + "start", "--head", "--redis-shard-ports", "6380,6381,6382", "--port", + "0" + ]) + check_call_ray(["stop"]) + + # Test starting Ray with all arguments specified. + check_call_ray([ + "start", "--head", "--redis-shard-ports", "6380,6381,6382", + "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", + "--resources", "{\"Custom\": 1}", "--port", "0" + ]) + check_call_ray(["stop"]) + + # Test starting Ray with invalid arguments. + with pytest.raises(subprocess.CalledProcessError): + check_call_ray( + ["start", "--head", "--address", "127.0.0.1:6379", "--port", "0"]) + check_call_ray(["stop"]) + + # Test --block. Killing a child process should cause the command to exit. + blocked = subprocess.Popen( + ["ray", "start", "--head", "--block", "--port", "0"]) + + wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30) + + blocked.poll() + assert blocked.returncode is None + + kill_process_by_name("raylet") + wait_for_children_of_pid_to_exit(blocked.pid, timeout=30) + blocked.wait() + assert blocked.returncode != 0, "ray start shouldn't return 0 on bad exit" + + # Test --block. Killing the command should clean up all child processes. + blocked = subprocess.Popen( + ["ray", "start", "--head", "--block", "--port", "0"]) + blocked.poll() + assert blocked.returncode is None + + wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30) + + blocked.terminate() + wait_for_children_of_pid_to_exit(blocked.pid, timeout=30) + blocked.wait() + assert blocked.returncode != 0, "ray start shouldn't return 0 on bad exit" + + +@pytest.mark.parametrize( + "call_ray_start", + ["ray start --head --num-cpus=1 " + "--node-ip-address=localhost"], + indirect=True) +def test_using_hostnames(call_ray_start): + ray.init(_node_ip_address="localhost", address="localhost:6379") + + @ray.remote + def f(): + return 1 + + assert ray.get(f.remote()) == 1 + + +def test_connecting_in_local_case(ray_start_regular): + address_info = ray_start_regular + + # Define a driver that just connects to Redis. + driver_script = """ +import ray +ray.init(address="{}") +print("success") +""".format(address_info["redis_address"]) + + out = run_string_as_driver(driver_script) + # Make sure the other driver succeeded. + assert "success" in out + + +def test_run_driver_twice(ray_start_regular): + # We used to have issue 2165 and 2288: + # https://github.com/ray-project/ray/issues/2165 + # https://github.com/ray-project/ray/issues/2288 + # both complain that driver will hang when run for the second time. + # This test is used to verify the fix for above issue, it will run the + # same driver for twice and verify whether both of them succeed. + address_info = ray_start_regular + driver_script = """ +import ray +import ray.tune as tune +import os +import time + +def train_func(config, reporter): # add a reporter arg + for i in range(2): + time.sleep(0.1) + reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics + +os.environ["TUNE_RESUME_PROMPT_OFF"] = "True" +ray.init(address="{}") +ray.tune.register_trainable("train_func", train_func) + +tune.run_experiments({{ + "my_experiment": {{ + "run": "train_func", + "stop": {{"mean_accuracy": 99}}, + "config": {{ + "layer1": {{ + "class_name": tune.grid_search(["a"]), + "config": {{"lr": tune.grid_search([1, 2])}} + }}, + }}, + "local_dir": os.path.expanduser("~/tmp") + }} +}}) +print("success") +""".format(address_info["redis_address"]) + + for i in range(2): + out = run_string_as_driver(driver_script) + assert "success" in out + + +@pytest.mark.skip(reason="fate sharing not implemented yet") +def test_driver_exiting_when_worker_blocked(call_ray_start): + # This test will create some drivers that submit some tasks and then + # exit without waiting for the tasks to complete. + address = call_ray_start + + ray.init(address=address) + + # Define a driver that creates two tasks, one that runs forever and the + # other blocked on the first in a `ray.get`. + driver_script = """ +import time +import ray +ray.init(address="{}") +@ray.remote +def f(): + time.sleep(10**6) +@ray.remote +def g(): + ray.get(f.remote()) +g.remote() +time.sleep(1) +print("success") +""".format(address) + + # Create some drivers and let them exit and make sure everything is + # still alive. + for _ in range(3): + out = run_string_as_driver(driver_script) + # Make sure the first driver ran to completion. + assert "success" in out + + # Define a driver that creates two tasks, one that runs forever and the + # other blocked on the first in a `ray.wait`. + driver_script = """ +import time +import ray +ray.init(address="{}") +@ray.remote +def f(): + time.sleep(10**6) +@ray.remote +def g(): + ray.wait([f.remote()]) +g.remote() +time.sleep(1) +print("success") +""".format(address) + + # Create some drivers and let them exit and make sure everything is + # still alive. + for _ in range(3): + out = run_string_as_driver(driver_script) + # Make sure the first driver ran to completion. + assert "success" in out + + # Define a driver that creates one task that depends on a nonexistent + # object. This task will be queued as waiting to execute. + driver_script_template = """ +import time +import ray +ray.init(address="{}") +@ray.remote +def g(x): + return +g.remote(ray.ObjectRef(ray.utils.hex_to_binary("{}"))) +time.sleep(1) +print("success") +""" + + # Create some drivers and let them exit and make sure everything is + # still alive. + for _ in range(3): + nonexistent_id = ray.ObjectRef.from_random() + driver_script = driver_script_template.format(address, + nonexistent_id.hex()) + out = run_string_as_driver(driver_script) + # Simulate the nonexistent dependency becoming available. + ray.worker.global_worker.put_object(None, nonexistent_id) + # Make sure the first driver ran to completion. + assert "success" in out + + # Define a driver that calls `ray.wait` on a nonexistent object. + driver_script_template = """ +import time +import ray +ray.init(address="{}") +@ray.remote +def g(): + ray.wait(ray.ObjectRef(ray.utils.hex_to_binary("{}"))) +g.remote() +time.sleep(1) +print("success") +""" + + # Create some drivers and let them exit and make sure everything is + # still alive. + for _ in range(3): + nonexistent_id = ray.ObjectRef.from_random() + driver_script = driver_script_template.format(address, + nonexistent_id.hex()) + out = run_string_as_driver(driver_script) + # Simulate the nonexistent dependency becoming available. + ray.worker.global_worker.put_object(None, nonexistent_id) + # Make sure the first driver ran to completion. + assert "success" in out + + @ray.remote + def f(): + return 1 + + # Make sure we can still talk with the raylet. + ray.get(f.remote()) + + +def test_multi_driver_logging(ray_start_regular): + address_info = ray_start_regular + address = address_info["redis_address"] + + # ray.init(address=address) + driver1_wait = Semaphore.options(name="driver1_wait").remote(value=0) + driver2_wait = Semaphore.options(name="driver2_wait").remote(value=0) + main_wait = Semaphore.options(name="main_wait").remote(value=0) + + # The creation of an actor is asynchronous. + # We need to wait for the completion of the actor creation, + # otherwise we can't get the actor by name. + ray.get(driver1_wait.locked.remote()) + ray.get(driver2_wait.locked.remote()) + ray.get(main_wait.locked.remote()) + + # Params are address, semaphore name, output1, output2 + driver_script_template = """ +import ray +import sys +from ray.test_utils import Semaphore + +@ray.remote(num_cpus=0) +def remote_print(s, file=None): + print(s, file=file) + +ray.init(address="{}") + +driver_wait = ray.get_actor("{}") +main_wait = ray.get_actor("main_wait") + +ray.get(main_wait.release.remote()) +ray.get(driver_wait.acquire.remote()) + +s1 = "{}" +ray.get(remote_print.remote(s1)) + +ray.get(main_wait.release.remote()) +ray.get(driver_wait.acquire.remote()) + +s2 = "{}" +ray.get(remote_print.remote(s2)) + +ray.get(main_wait.release.remote()) + """ + + p1 = run_string_as_driver_nonblocking( + driver_script_template.format(address, "driver1_wait", "1", "2")) + p2 = run_string_as_driver_nonblocking( + driver_script_template.format(address, "driver2_wait", "3", "4")) + + ray.get(main_wait.acquire.remote()) + ray.get(main_wait.acquire.remote()) + # At this point both of the other drivers are fully initialized. + + ray.get(driver1_wait.release.remote()) + ray.get(driver2_wait.release.remote()) + + # At this point driver1 should receive '1' and driver2 '3' + ray.get(main_wait.acquire.remote()) + ray.get(main_wait.acquire.remote()) + + ray.get(driver1_wait.release.remote()) + ray.get(driver2_wait.release.remote()) + + # At this point driver1 should receive '2' and driver2 '4' + ray.get(main_wait.acquire.remote()) + ray.get(main_wait.acquire.remote()) + + driver1_out = p1.stdout.read().decode("ascii") + driver2_out = p2.stdout.read().decode("ascii") + if sys.platform == "win32": + driver1_out = driver1_out.replace("\r", "") + driver2_out = driver2_out.replace("\r", "") + driver1_out_split = driver1_out.split("\n") + driver2_out_split = driver2_out.split("\n") + + assert driver1_out_split[0][-1] == "1", driver1_out_split + assert driver1_out_split[1][-1] == "2", driver1_out_split + assert driver2_out_split[0][-1] == "3", driver2_out_split + assert driver2_out_split[1][-1] == "4", driver2_out_split + + +if __name__ == "__main__": + import pytest + # Make subprocess happy in bazel. + os.environ["LC_ALL"] = "en_US.UTF-8" + os.environ["LANG"] = "en_US.UTF-8" + sys.exit(pytest.main(["-v", __file__]))