From 64a3a7239e7ecd068d399ab0a8992cd0360ff7ed Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 25 Nov 2019 14:12:11 -0800 Subject: [PATCH] Set RAY_FORCE_DIRECT=1 for run_rllib_tests, test_basic (#6171) --- .gitignore | 1 + ci/jenkins_tests/run_multi_node_tests.sh | 8 +- ci/jenkins_tests/run_rllib_tests.sh | 208 +++++++++--------- ci/suppress_output | 6 + python/ray/tests/BUILD | 20 +- python/ray/tests/test_actor.py | 8 + python/ray/tests/test_actor_direct.py | 16 ++ python/ray/tests/test_basic.py | 27 +++ python/ray/tests/test_basic_direct.py | 16 ++ src/ray/core_worker/core_worker.cc | 15 +- src/ray/core_worker/core_worker.h | 3 + .../transport/dependency_resolver.cc | 2 +- .../transport/dependency_resolver.h | 3 +- .../core_worker/transport/raylet_transport.cc | 3 +- src/ray/raylet/raylet_client.cc | 8 + src/ray/rpc/client_call.h | 8 +- 16 files changed, 229 insertions(+), 123 deletions(-) create mode 100644 python/ray/tests/test_actor_direct.py create mode 100644 python/ray/tests/test_basic_direct.py diff --git a/.gitignore b/.gitignore index 642aca1b0..3b2ab0753 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ /python/ray/pickle5_files/ /python/build /python/dist +/python/python-driver-* /thirdparty/pkg/ /build/java diff --git a/ci/jenkins_tests/run_multi_node_tests.sh b/ci/jenkins_tests/run_multi_node_tests.sh index 62f9b9838..1ec7ece1a 100755 --- a/ci/jenkins_tests/run_multi_node_tests.sh +++ b/ci/jenkins_tests/run_multi_node_tests.sh @@ -15,6 +15,10 @@ DOCKER_SHA=$($ROOT_DIR/../../build-docker.sh --output-sha --no-cache) SUPPRESS_OUTPUT=$ROOT_DIR/../suppress_output echo "Using Docker image" $DOCKER_SHA +######################## RLLIB TESTS ################################# + +source $ROOT_DIR/run_rllib_tests.sh + ######################## EXAMPLE TESTS ################################# $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ @@ -32,10 +36,6 @@ $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/doc/examples/doc_code/tf_example.py -######################## RLLIB TESTS ################################# - -source $ROOT_DIR/run_rllib_tests.sh - ######################## TUNE TESTS ################################# bash $ROOT_DIR/run_tune_tests.sh ${MEMORY_SIZE} ${SHM_SIZE} $DOCKER_SHA diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index 72d96fa0a..4411c410b 100755 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -1,62 +1,62 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_catalog.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_catalog.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_optimizers.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_optimizers.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_filters.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_filters.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_evaluators.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_evaluators.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_eager_support.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_eager_support.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env PongDeterministic-v0 \ --run A3C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pong-ram-v4 \ --run A3C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env PongDeterministic-v0 \ --run A2C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "lr": 1e-4, "sgd_minibatch_size": 64, "train_batch_size": 2000, "num_workers": 1, "model": {"free_log_std": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"simple_optimizer": false, "num_sgd_iter": 2, "model": {"use_lstm": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"simple_optimizer": true, "num_sgd_iter": 2, "model": {"use_lstm": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 1}' \ @@ -64,215 +64,215 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --ray-num-gpus 1 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "lr": 1e-4, "sgd_minibatch_size": 64, "train_batch_size": 2000, "num_workers": 1, "use_gae": false, "batch_mode": "complete_episodes"}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"remote_worker_envs": true, "remote_env_batch_wait_ms": 99999999, "num_envs_per_worker": 2, "num_workers": 1, "train_batch_size": 100, "sgd_minibatch_size": 50}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"remote_worker_envs": true, "num_envs_per_worker": 2, "num_workers": 1, "train_batch_size": 100, "sgd_minibatch_size": 50}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pendulum-v0 \ --run APPO \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2, "num_gpus": 0}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pendulum-v0 \ --run ES \ --stop '{"training_iteration": 1}' \ --config '{"stepsize": 0.01, "episodes_per_batch": 20, "train_batch_size": 100, "num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pong-v0 \ --run ES \ --stop '{"training_iteration": 1}' \ --config '{"stepsize": 0.01, "episodes_per_batch": 20, "train_batch_size": 100, "num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run A3C \ --stop '{"training_iteration": 1}' \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 1}' \ --config '{"lr": 1e-3, "schedule_max_timesteps": 100000, "exploration_fraction": 0.1, "exploration_final_eps": 0.02, "dueling": false, "hiddens": [], "model": {"fcnet_hiddens": [64], "fcnet_activation": "relu"}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run APEX \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2, "timesteps_per_iteration": 1000, "num_gpus": 0, "min_iter_time_s": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env FrozenLake-v0 \ --run DQN \ --stop '{"training_iteration": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env FrozenLake-v0 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"num_sgd_iter": 10, "sgd_minibatch_size": 64, "train_batch_size": 1000, "num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env PongDeterministic-v4 \ --run DQN \ --stop '{"training_iteration": 1}' \ --config '{"lr": 1e-4, "schedule_max_timesteps": 2000000, "buffer_size": 10000, "exploration_fraction": 0.1, "exploration_final_eps": 0.01, "sample_batch_size": 4, "learning_starts": 10000, "target_network_update_freq": 1000, "gamma": 0.99, "prioritized_replay": true}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env MontezumaRevenge-v0 \ --run PPO \ --stop '{"training_iteration": 1}' \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "lr": 1e-4, "sgd_minibatch_size": 64, "train_batch_size": 2000, "num_workers": 1, "model": {"dim": 40, "conv_filters": [[16, [8, 8], 4], [32, [4, 4], 2], [512, [5, 5], 1]]}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run A3C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2, "model": {"use_lstm": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 1}' \ --config '{"sample_batch_size": 500, "num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 1}' \ --config '{"sample_batch_size": 500, "use_pytorch": true}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 1}' \ --config '{"sample_batch_size": 500, "num_workers": 1, "model": {"use_lstm": true, "max_seq_len": 100}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 1}' \ --config '{"sample_batch_size": 500, "num_workers": 1, "num_envs_per_worker": 10}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pong-v0 \ --run PG \ --stop '{"training_iteration": 1}' \ --config '{"sample_batch_size": 500, "num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env FrozenLake-v0 \ --run PG \ --stop '{"training_iteration": 1}' \ --config '{"sample_batch_size": 500, "num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pendulum-v0 \ --run DDPG \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 1}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 1}' \ --config '{"num_gpus": 0, "num_workers": 2, "num_aggregation_workers": 2, "min_iter_time_s": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 1}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "model": {"use_lstm": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 1}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_data_loader_buffers": 2, "replay_buffer_num_slots": 100, "replay_proportion": 1.0}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 1}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_data_loader_buffers": 2, "replay_buffer_num_slots": 100, "replay_proportion": 1.0, "model": {"use_lstm": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env MountainCarContinuous-v0 \ --run DDPG \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env MountainCarContinuous-v0 \ --run DDPG \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pendulum-v0 \ --run APEX_DDPG \ --ray-num-cpus 8 \ @@ -280,7 +280,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --config '{"num_workers": 2, "optimizer": {"num_replay_buffer_shards": 1}, "learning_starts": 100, "min_iter_time_s": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pendulum-v0 \ --run APEX_DDPG \ --ray-num-cpus 8 \ @@ -288,196 +288,196 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --config '{"num_workers": 2, "optimizer": {"num_replay_buffer_shards": 1}, "learning_starts": 100, "min_iter_time_s": 1, "batch_mode": "complete_episodes", "parameter_noise": false}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run MARWIL \ --stop '{"training_iteration": 1}' \ --config '{"input": "/ray/rllib/tests/data/cartpole_small", "learning_starts": 0, "input_evaluation": ["wis", "is"], "shuffle_buffer_size": 10}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 1}' \ --config '{"input": "/ray/rllib/tests/data/cartpole_small", "learning_starts": 0, "input_evaluation": ["wis", "is"], "soft_q": true}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_local.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_local.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_reproducibility.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_reproducibility.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_dependency.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_dependency.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_legacy.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_legacy.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_io.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_io.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_checkpoint_restore.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_checkpoint_restore.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_rollout_worker.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_rollout_worker.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_nested_spaces.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_nested_spaces.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_external_env.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_external_env.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_external_multi_agent_env.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_external_multi_agent_env.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_keras_model.py --run=A2C --stop=50 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_keras_model.py --run=A2C --stop=50 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_keras_model.py --run=PPO --stop=50 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_keras_model.py --run=PPO --stop=50 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_keras_model.py --run=DQN --stop=50 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_keras_model.py --run=DQN --stop=50 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/parametric_action_cartpole.py --run=PG --stop=50 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/parametric_action_cartpole.py --run=PG --stop=50 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/parametric_action_cartpole.py --run=PPO --stop=50 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/parametric_action_cartpole.py --run=PPO --stop=50 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/parametric_action_cartpole.py --run=DQN --stop=50 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/parametric_action_cartpole.py --run=DQN --stop=50 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_lstm.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_lstm.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=PPO + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=PPO docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=PG + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=PG docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=DQN + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=DQN docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=DDPG + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/batch_norm_model.py --num-iters=1 --run=DDPG docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_multi_agent_env.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_multi_agent_env.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_supported_spaces.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_supported_spaces.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_env_with_subprocess.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_env_with_subprocess.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/tests/test_rollout.sh + /ray/ci/suppress_output --force-direct /ray/rllib/tests/test_rollout.sh # Run all single-agent regression tests (3x retry each) for yaml in $(ls $ROOT_DIR/../../rllib/tuned_examples/regression_tests); do docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/run_regression_tests.py \ + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/run_regression_tests.py \ /ray/rllib/tuned_examples/regression_tests/$yaml done # Try a couple times since it's stochastic docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/multiagent_pendulum.py || \ + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/multiagent_pendulum.py || \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/multiagent_pendulum.py || \ + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/multiagent_pendulum.py || \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/multiagent_pendulum.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/multiagent_pendulum.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/multiagent_cartpole.py --num-iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/multiagent_cartpole.py --num-iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/multiagent_two_trainers.py --num-iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/multiagent_two_trainers.py --num-iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_avail_actions_qmix.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_avail_actions_qmix.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/cartpole_lstm.py --run=PPO --stop=200 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/cartpole_lstm.py --run=PPO --stop=200 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/cartpole_lstm.py --run=IMPALA --stop=100 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/cartpole_lstm.py --run=IMPALA --stop=100 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/cartpole_lstm.py --stop=200 --use-prev-action-reward + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/cartpole_lstm.py --stop=200 --use-prev-action-reward docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_loss.py --iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_loss.py --iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/rollout_worker_custom_workflow.py + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/rollout_worker_custom_workflow.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/eager_execution.py --iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/eager_execution.py --iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_tf_policy.py --iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_tf_policy.py --iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_torch_policy.py --iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_torch_policy.py --iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/rollout_worker_custom_workflow.py + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/rollout_worker_custom_workflow.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_metrics_and_callbacks.py --num-iters=2 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_metrics_and_callbacks.py --num-iters=2 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/contrib/random_agent/random_agent.py + /ray/ci/suppress_output --force-direct python /ray/rllib/contrib/random_agent/random_agent.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/centralized_critic.py --stop=2000 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/centralized_critic.py --stop=2000 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/centralized_critic_2.py --stop=2000 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/centralized_critic_2.py --stop=2000 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=contrib/MADDPG + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/twostep_game.py --stop=2000 --run=contrib/MADDPG docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=PG + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/twostep_game.py --stop=2000 --run=PG docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=QMIX + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/twostep_game.py --stop=2000 --run=QMIX docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=APEX_QMIX + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/twostep_game.py --stop=2000 --run=APEX_QMIX docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/autoregressive_action_dist.py --stop=150 + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/autoregressive_action_dist.py --stop=150 docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env PongDeterministic-v4 \ --run A3C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2, "use_pytorch": true, "sample_async": false, "model": {"use_lstm": false, "grayscale": true, "zero_mean": false, "dim": 84}, "preprocessor_pref": "rllib"}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env CartPole-v1 \ --run A3C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2, "use_pytorch": true, "sample_async": false}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env Pendulum-v0 \ --run A3C \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2, "use_pytorch": true, "sample_async": false}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output /ray/rllib/train.py \ + /ray/ci/suppress_output --force-direct /ray/rllib/train.py \ --env PongDeterministic-v4 \ --run IMPALA \ --stop='{"timesteps_total": 40000}' \ @@ -485,13 +485,13 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --config '{"num_workers": 1, "num_gpus": 0, "num_envs_per_worker": 32, "sample_batch_size": 50, "train_batch_size": 50, "learner_queue_size": 1}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/agents/impala/vtrace_test.py + /ray/ci/suppress_output --force-direct python /ray/rllib/agents/impala/vtrace_test.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_ignore_worker_failure.py + /ray/ci/suppress_output --force-direct python /ray/rllib/tests/test_ignore_worker_failure.py docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_keras_rnn_model.py --run=PPO --stop=50 --env=RepeatAfterMeEnv + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_keras_rnn_model.py --run=PPO --stop=50 --env=RepeatAfterMeEnv docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/examples/custom_keras_rnn_model.py --run=PPO --stop=50 --env=RepeatInitialEnv + /ray/ci/suppress_output --force-direct python /ray/rllib/examples/custom_keras_rnn_model.py --run=PPO --stop=50 --env=RepeatInitialEnv diff --git a/ci/suppress_output b/ci/suppress_output index 583bd6275..8510c5367 100755 --- a/ci/suppress_output +++ b/ci/suppress_output @@ -19,6 +19,12 @@ watchdog() { watchdog & 2>/dev/null WATCHDOG_PID=$! +if [ "$1" == "--force-direct" ]; then + echo "Setting RAY_FORCE_DIRECT=1" + export RAY_FORCE_DIRECT=1 + shift +fi + time "$@" >$TMPFILE 2>&1 CODE=$? diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 78356ef82..37d11e9ee 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -1,11 +1,19 @@ py_test( name = "test_actor", - size = "large", + size = "medium", srcs = ["test_actor.py"], tags = ["exclusive"], deps = ["//:ray_lib"], ) +py_test( + name = "test_actor_direct", + size = "medium", + srcs = ["test_actor_direct.py", "test_actor.py"], + tags = ["exclusive", "manual"], + deps = ["//:ray_lib"], +) + py_test( name = "test_actor_resources", size = "large", @@ -24,12 +32,20 @@ py_test( py_test( name = "test_basic", - size = "large", + size = "medium", srcs = ["test_basic.py"], tags = ["exclusive"], deps = ["//:ray_lib"], ) +py_test( + name = "test_basic_direct", + size = "medium", + srcs = ["test_basic_direct.py", "test_basic.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_advanced", size = "large", diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 53c8957b3..1d5713ff9 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -18,6 +18,8 @@ import ray.test_utils import ray.cluster_utils from ray.test_utils import run_string_as_driver +RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT")) + def test_actor_init_error_propagated(ray_start_regular): @ray.remote @@ -807,6 +809,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet") def test_actor_init_fails(ray_start_cluster_head): cluster = ray_start_cluster_head remote_node = cluster.add_node() @@ -832,6 +835,7 @@ def test_actor_init_fails(ray_start_cluster_head): assert results == [1 for actor in actors] +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet") def test_reconstruction_suppression(ray_start_cluster_head): cluster = ray_start_cluster_head num_nodes = 5 @@ -1148,6 +1152,7 @@ def setup_queue_actor(): ray.shutdown() +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_fork(setup_queue_actor): queue = setup_queue_actor @@ -1166,6 +1171,7 @@ def test_fork(setup_queue_actor): assert filtered_items == list(range(1)) +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_fork_consistency(setup_queue_actor): queue = setup_queue_actor @@ -1197,6 +1203,7 @@ def test_fork_consistency(setup_queue_actor): assert filtered_items == list(range(num_items_per_fork)) +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_pickled_handle_consistency(setup_queue_actor): queue = setup_queue_actor @@ -1230,6 +1237,7 @@ def test_pickled_handle_consistency(setup_queue_actor): assert filtered_items == list(range(num_items_per_fork)) +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock") def test_nested_fork(setup_queue_actor): queue = setup_queue_actor diff --git a/python/ray/tests/test_actor_direct.py b/python/ray/tests/test_actor_direct.py new file mode 100644 index 000000000..c302b4596 --- /dev/null +++ b/python/ray/tests/test_actor_direct.py @@ -0,0 +1,16 @@ +"""Wrapper script that sets RAY_FORCE_DIRECT.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import sys +import os + +if __name__ == "__main__": + os.environ["RAY_FORCE_DIRECT"] = "1" + sys.exit( + pytest.main( + ["-v", + os.path.join(os.path.dirname(__file__), "test_actor.py")])) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 16aef4830..3f1197bb2 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -5,6 +5,7 @@ from __future__ import print_function import collections import io +import os import json import logging import re @@ -23,6 +24,8 @@ import ray.test_utils logger = logging.getLogger(__name__) +RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT")) + def test_simple_serialization(ray_start_regular): primitive_objects = [ @@ -91,6 +94,7 @@ def test_simple_serialization(ray_start_regular): assert type(obj) == type(new_obj_2) +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="resource shape not implemented") def test_fair_queueing(shutdown_only): ray.init( num_cpus=1, _internal_config=json.dumps({ @@ -1025,6 +1029,7 @@ def test_defining_remote_functions(shutdown_only): assert ray.get(m.remote(1)) == 2 +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented") def test_submit_api(shutdown_only): ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @@ -1083,6 +1088,7 @@ def test_submit_api(shutdown_only): assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2] +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented") def test_many_fractional_resources(shutdown_only): ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) @@ -1318,6 +1324,27 @@ def test_direct_call_chain(ray_start_cluster): assert ray.get(x) == 100 +def test_direct_inline_arg_memory_corruption(ray_start_regular): + @ray.remote + def f(): + return np.zeros(1000, dtype=np.uint8) + + @ray.remote + class Actor(object): + def __init__(self): + self.z = [] + + def add(self, x): + self.z.append(x) + for prev in self.z: + assert np.sum(prev) == 0, ("memory corruption detected", prev) + + a = Actor.options(is_direct_call=True).remote() + f_direct = f.options(is_direct_call=True) + for i in range(100): + ray.get(a.add.remote(f_direct.remote())) + + def test_direct_actor_enabled(ray_start_regular): @ray.remote class Actor(object): diff --git a/python/ray/tests/test_basic_direct.py b/python/ray/tests/test_basic_direct.py new file mode 100644 index 000000000..028c46cfa --- /dev/null +++ b/python/ray/tests/test_basic_direct.py @@ -0,0 +1,16 @@ +"""Wrapper script that sets RAY_FORCE_DIRECT.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import sys +import os + +if __name__ == "__main__": + os.environ["RAY_FORCE_DIRECT"] = "1" + sys.exit( + pytest.main( + ["-v", + os.path.join(os.path.dirname(__file__), "test_basic.py")])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8df0ab721..173499637 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -28,11 +28,6 @@ void BuildCommonTaskSpec( // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { - // TODO(ekl) remove this check once we deprecate TaskTransportType::RAYLET - if (transport_type == ray::TaskTransportType::RAYLET) { - RAY_CHECK(!arg.GetReference().IsDirectCallType()) - << "Passing direct call objects to non-direct tasks is not allowed."; - } builder.AddByRefArg(arg.GetReference()); } else { builder.AddByValueArg(arg.GetValue()); @@ -169,6 +164,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RAY_CHECK_OK(plasma_store_provider_->Put(obj, obj_id)); }, ref_counting_enabled ? reference_counter_ : nullptr, raylet_client_)); + resolver_.reset(new LocalDependencyResolver(memory_store_)); // Create an entry for the driver task in the task table. This task is // added immediately with status RUNNING. This allows us to push errors @@ -621,7 +617,12 @@ Status CoreWorker::CreateActor(const RayFunction &function, *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); PinObjectReferences(task_spec, TaskTransportType::RAYLET); - return raylet_client_->SubmitTask(task_spec); + // TODO(ekl) if we moved actor creation to use direct call tasks, then we won't + // need to manually resolve direct call args here. + resolver_->ResolveDependencies(task_spec, [this, task_spec]() { + RAY_CHECK_OK(raylet_client_->SubmitTask(task_spec)); + }); + return Status::OK(); } Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, @@ -874,7 +875,7 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, metadata = std::make_shared( const_cast(task.ArgMetadata(i)), task.ArgMetadataSize(i)); } - args->at(i) = std::make_shared(data, metadata); + args->at(i) = std::make_shared(data, metadata, /*copy_data*/ true); arg_reference_ids->at(i) = ObjectID::Nil(); } } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 172567f1d..ef91c8b06 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -522,6 +522,9 @@ class CoreWorker { /// Map from actor ID to a handle to that actor. absl::flat_hash_map> actor_handles_; + /// Resolve local and remote dependencies for actor creation. + std::unique_ptr resolver_; + /// /// Fields related to task execution. /// diff --git a/src/ray/core_worker/transport/dependency_resolver.cc b/src/ray/core_worker/transport/dependency_resolver.cc index a30acade8..52930d503 100644 --- a/src/ray/core_worker/transport/dependency_resolver.cc +++ b/src/ray/core_worker/transport/dependency_resolver.cc @@ -42,7 +42,7 @@ void DoInlineObjectValue(const ObjectID &obj_id, std::shared_ptr valu RAY_CHECK(found) << "obj id " << obj_id << " not found"; } -void LocalDependencyResolver::ResolveDependencies(const TaskSpecification &task, +void LocalDependencyResolver::ResolveDependencies(TaskSpecification &task, std::function on_complete) { absl::flat_hash_set local_dependencies; for (size_t i = 0; i < task.NumArgs(); i++) { diff --git a/src/ray/core_worker/transport/dependency_resolver.h b/src/ray/core_worker/transport/dependency_resolver.h index 16644c2bc..ae65dddbd 100644 --- a/src/ray/core_worker/transport/dependency_resolver.h +++ b/src/ray/core_worker/transport/dependency_resolver.h @@ -22,8 +22,7 @@ class LocalDependencyResolver { /// Note: This method **will mutate** the given TaskSpecification. /// /// Postcondition: all direct call ids in arguments are converted to values. - void ResolveDependencies(const TaskSpecification &task, - std::function on_complete); + void ResolveDependencies(TaskSpecification &task, std::function on_complete); /// Return the number of tasks pending dependency resolution. /// TODO(ekl) this should be exposed in worker stats. diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index d1709df12..ac17eca80 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -17,7 +17,8 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( rpc::SendReplyCallback send_reply_callback) { const Task task(request.task()); const auto &task_spec = task.GetTaskSpecification(); - RAY_LOG(DEBUG) << "Received task " << task_spec.TaskId(); + RAY_LOG(DEBUG) << "Received task " << task_spec.TaskId() << " is create " + << task_spec.IsActorCreationTask(); // Set the resource IDs for this task. // TODO: convert the resource map to protobuf and change this. diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 67eea7436..75e4f970d 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -230,6 +230,14 @@ RayletClient::RayletClient(std::shared_ptr gr ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) { ray::rpc::SubmitTaskRequest request; + for (size_t i = 0; i < task_spec.NumArgs(); i++) { + if (task_spec.ArgByRef(i)) { + for (size_t j = 0; j < task_spec.ArgIdCount(i); j++) { + RAY_CHECK(!task_spec.ArgId(i, j).IsDirectCallType()) + << "Passing direct call objects to non-direct tasks is not allowed."; + } + } + } request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); return grpc_client_->SubmitTask(request, /*callback=*/nullptr); } diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 8e748b685..a62f5147f 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -198,8 +198,12 @@ class ClientCallManager { auto status = cqs_[index].AsyncNext(&got_tag, &ok, deadline); if (status == grpc::CompletionQueue::SHUTDOWN) { break; - } - if (status != grpc::CompletionQueue::TIMEOUT) { + } else if (status == grpc::CompletionQueue::TIMEOUT && shutdown_) { + // If we timed out and shutdown, then exit immediately. This should not + // be needed, but gRPC seems to not return SHUTDOWN correctly in these + // cases (e.g., test_wait will hang on shutdown without this check). + break; + } else if (status != grpc::CompletionQueue::TIMEOUT) { auto tag = reinterpret_cast(got_tag); if (ok && !main_service_.stopped() && !shutdown_) { // Post the callback to the main event loop.