From 5ab5017c6776ec9c49cb0bb0ee4bee9981df724e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 9 Jul 2019 20:22:30 -0700 Subject: [PATCH] [rllib] Fix impala stress test (#5101) * add copy * upgrade to tf 1.14 * update * reduce count to workaround https://github.com/ray-project/ray/issues/5125 * Update impala.py * placeholder * comments * update --- ci/long_running_tests/README.rst | 8 ++-- ci/long_running_tests/config.yaml | 10 ++++- ci/long_running_tests/start_workloads.sh | 44 ++++++------------- python/ray/rllib/optimizers/aso_aggregator.py | 9 +++- 4 files changed, 33 insertions(+), 38 deletions(-) diff --git a/ci/long_running_tests/README.rst b/ci/long_running_tests/README.rst index 3395f781a..c4c5475d6 100644 --- a/ci/long_running_tests/README.rst +++ b/ci/long_running_tests/README.rst @@ -7,14 +7,14 @@ intended to run forever until they fail. Running the Workloads --------------------- -To run the workloads, run +To run the workloads, first edit the config.yaml and replace +``RAY_WHEEL_TO_TEST_HERE`` with the desired version to test, then run: .. code-block:: bash - ./start_workloads.sh + ./start_workloads.sh -using the appropriate values of ````, ````, and -````. This will start one EC2 instance per workload and will start +This will start one EC2 instance per workload and will start the workloads running (one per instance). Running the ``./start_workloads.sh`` script again will clean up any state from the previous runs and will start the workloads again. diff --git a/ci/long_running_tests/config.yaml b/ci/long_running_tests/config.yaml index fd6722951..f6ccaa93e 100644 --- a/ci/long_running_tests/config.yaml +++ b/ci/long_running_tests/config.yaml @@ -34,9 +34,17 @@ worker_nodes: # List of shell commands to run to set up nodes. setup_commands: + # Install latest TensorFlow + - source activate tensorflow_p36 && conda remove -y --force wrapt || true + - source activate tensorflow_p36 && pip install -U tensorflow==1.14 # Install nightly Ray wheels. - - source activate tensorflow_p36 && pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/<<>>/<<>>/ray-<<>>-cp36-cp36m-manylinux1_x86_64.whl + # Example: https://s3-us-west-2.amazonaws.com/ray-wheels/<<>>/<<>>/ray-<<>>-cp36-cp36m-manylinux1_x86_64.whl + - source activate tensorflow_p36 && pip install -U RAY_WHEEL_TO_TEST_HERE - source activate tensorflow_p36 && pip install ray[rllib] ray[debug] gym[atari] + - source activate tensorflow_p36 && pip install ray[debug] + - echo set-window-option -g mouse on > ~/.tmux.conf + - echo 'termcapinfo xterm* ti@:te@' > ~/.screenrc + # Uncomment the following if you wish to build Ray instead. # - sudo apt-get update # - sudo apt-get install -y build-essential curl unzip diff --git a/ci/long_running_tests/start_workloads.sh b/ci/long_running_tests/start_workloads.sh index fde3db30f..3d92a0739 100755 --- a/ci/long_running_tests/start_workloads.sh +++ b/ci/long_running_tests/start_workloads.sh @@ -3,40 +3,16 @@ set -e ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) - -if [[ -z "$1" ]]; then - echo "ERROR: The first argument must be the Ray branch to test (e.g., 'master')." - exit 1 -else - RAY_BRANCH=$1 -fi - -if [[ -z "$2" ]]; then - echo "ERROR: The second argument must be the Ray version to test (e.g., '0.8.0.dev1')." - exit 1 -else - RAY_VERSION=$2 -fi - -if [[ -z "$3" ]]; then - echo "ERROR: The third argument must be the Ray commit to test (e.g., '62e4b591e3d6443ce25b0f05cc32b43d5e2ebb3d')." - exit 1 -else - RAY_COMMIT=$3 -fi - -echo "Testing ray==$RAY_VERSION at commit $RAY_COMMIT." -echo "The wheels used will live under https://s3-us-west-2.amazonaws.com/ray-wheels/$RAY_BRANCH/$RAY_VERSION/$RAY_COMMIT/" - - pushd "$ROOT_DIR" # Substitute in the appropriate Ray version and commit in the config file and # store it in a temporary file. -CLUSTER_CONFIG="config_temporary.yaml" -sed -e "s/<<>>/$RAY_BRANCH/g; - s/<<>>/$RAY_VERSION/g; - s/<<>>/$RAY_COMMIT/;" config.yaml > "$CLUSTER_CONFIG" +CLUSTER_CONFIG="config.yaml" + +if grep -q RAY_WHEEL_TO_TEST_HERE $CLUSTER_CONFIG; then + echo "You must replace the RAY_WHEEL_TO_TEST_HERE string in $CLUSTER_CONFIG." + exit 1 +fi # Start one instance per workload. for workload_file in "$ROOT_DIR"/workloads/*; do @@ -47,6 +23,12 @@ done # Wait for all of the nodes to be up. wait +status=$? +if [ $status != 0 ]; then + echo "Some update processes failed with $status" + exit 1 +fi + # Start the workloads running. for workload_file in "$ROOT_DIR"/workloads/*; do file_name=$(basename -- "$workload_file") @@ -55,7 +37,7 @@ for workload_file in "$ROOT_DIR"/workloads/*; do # Copy the workload to the cluster. ray rsync_up $CLUSTER_CONFIG --cluster-name="$workload_name" "$workload_file" "$file_name" # Clean up previous runs if relevant. - ray exec $CLUSTER_CONFIG --cluster-name="$workload_name" "ray stop; rm -r /tmp/ray; tmux kill-server | true" + ray exec $CLUSTER_CONFIG --cluster-name="$workload_name" "source activate tensorflow_p36 && ray stop; rm -r /tmp/ray; tmux kill-server | true" # Start the workload. ray exec $CLUSTER_CONFIG --cluster-name="$workload_name" "source activate tensorflow_p36 && python $file_name" --tmux ) & diff --git a/python/ray/rllib/optimizers/aso_aggregator.py b/python/ray/rllib/optimizers/aso_aggregator.py index bc7c75bbf..79ef563f3 100644 --- a/python/ray/rllib/optimizers/aso_aggregator.py +++ b/python/ray/rllib/optimizers/aso_aggregator.py @@ -105,8 +105,13 @@ class AggregationWorkerBase(object): self.batch_buffer.append(sample_batch) if sum(b.count for b in self.batch_buffer) >= self.train_batch_size: - train_batch = self.batch_buffer[0].concat_samples( - self.batch_buffer) + if len(self.batch_buffer) == 1: + # make a defensive copy to avoid sharing plasma memory + # across multiple threads + train_batch = self.batch_buffer[0].copy() + else: + train_batch = self.batch_buffer[0].concat_samples( + self.batch_buffer) yield train_batch self.batch_buffer = []