diff --git a/.bazelrc b/.bazelrc index 2fc26dd44..8ef0453f3 100644 --- a/.bazelrc +++ b/.bazelrc @@ -29,3 +29,11 @@ build --http_timeout_scaling=5.0 # This workaround is due to an incompatibility of # bazel_common/tools/maven/pom_file.bzl with Bazel 1.0 build --incompatible_depset_is_not_iterable=false + +# Thread sanitizer configuration: +build:tsan --strip=never +build:tsan --copt -fsanitize=thread +build:tsan --copt -DTHREAD_SANITIZER +build:tsan --copt -g +build:tsan --copt -fno-omit-frame-pointer +build:tsan --linkopt -fsanitize=thread diff --git a/.travis.yml b/.travis.yml index 6702f3fda..92eba64c2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,6 +80,22 @@ matrix: - go get github.com/bazelbuild/buildtools/buildifier - ./ci/travis/bazel-format.sh + - os: linux + env: SANITIZER=1 CC=clang PYTHON=3.5 PYTHONWARNINGS=ignore + + install: + - eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py` + - if [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi + + - ./ci/suppress_output ./ci/travis/install-bazel.sh + - ./ci/suppress_output ./ci/travis/install-dependencies.sh + - export PATH="$HOME/miniconda/bin:$PATH" + - ./ci/suppress_output ./ci/travis/install-ray.sh + + script: + # Run core worker tests with thread sanitizer + - RAY_BAZEL_CONFIG="--config=tsan" TSAN_OPTIONS="report_atomic_races=0" ./ci/suppress_output bash src/ray/test/run_core_worker_tests.sh + # Build Linux wheels. - os: linux env: LINUX_WHEELS=1 PYTHONWARNINGS=ignore diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 4123f865d..461a0ab14 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -44,7 +44,7 @@ ActorHandle::ActorHandle(const std::string &serialized) void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type, const ObjectID new_cursor) { - std::unique_lock guard(mutex_); + absl::MutexLock guard(&mutex_); // Build actor task spec. const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID()); const ObjectID actor_creation_dummy_object_id = @@ -59,7 +59,7 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); } void ActorHandle::Reset() { - std::unique_lock guard(mutex_); + absl::MutexLock guard(&mutex_); task_counter_ = 0; actor_cursor_ = ObjectID::FromBinary(inner_.actor_cursor()); } diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index f2b6bd92c..68554de0b 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -54,10 +54,16 @@ class ActorHandle { void Reset(); // Mark the actor handle as dead. - void MarkDead() { state_ = rpc::ActorTableData::DEAD; } + void MarkDead() { + absl::MutexLock lock(&mutex_); + state_ = rpc::ActorTableData::DEAD; + } // Returns whether the actor is known to be dead. - bool IsDead() const { return state_ == rpc::ActorTableData::DEAD; } + bool IsDead() const { + absl::MutexLock lock(&mutex_); + return state_ == rpc::ActorTableData::DEAD; + } private: // Protobuf-defined persistent state of the actor handle. @@ -65,18 +71,18 @@ class ActorHandle { /// The actor's state (alive or dead). This defaults to ALIVE. Once marked /// DEAD, the actor handle can never go back to being ALIVE. - rpc::ActorTableData::ActorState state_ = rpc::ActorTableData::ALIVE; + rpc::ActorTableData::ActorState state_ GUARDED_BY(mutex_) = rpc::ActorTableData::ALIVE; /// The unique id of the dummy object returned by the previous task. /// TODO: This can be removed once we schedule actor tasks by task counter /// only. // TODO: Save this state in the core worker. - ObjectID actor_cursor_; + ObjectID actor_cursor_ GUARDED_BY(mutex_); // Number of tasks that have been submitted on this handle. - uint64_t task_counter_ = 0; + uint64_t task_counter_ GUARDED_BY(mutex_) = 0; - /// Guards actor_cursor_ and task_counter_. - std::mutex mutex_; + /// Mutex to protect fields in the actor handle. + mutable absl::Mutex mutex_; FRIEND_TEST(ZeroNodeTest, TestActorHandle); }; diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index a62f5147f..3a053ff0b 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -3,6 +3,7 @@ #include #include +#include "absl/synchronization/mutex.h" #include "ray/common/grpc_util.h" #include "ray/common/status.h" @@ -22,6 +23,8 @@ class ClientCall { virtual void OnReplyReceived() = 0; /// Return status. virtual ray::Status GetStatus() = 0; + /// Set return status. + virtual void SetReturnStatus() = 0; virtual ~ClientCall() = default; }; @@ -46,11 +49,24 @@ class ClientCallImpl : public ClientCall { /// \param[in] callback The callback function to handle the reply. explicit ClientCallImpl(const ClientCallback &callback) : callback_(callback) {} - Status GetStatus() override { return GrpcStatusToRayStatus(status_); } + Status GetStatus() override { + absl::MutexLock lock(&mutex_); + return return_status_; + } + + void SetReturnStatus() override { + absl::MutexLock lock(&mutex_); + return_status_ = GrpcStatusToRayStatus(status_); + } void OnReplyReceived() override { + ray::Status status; + { + absl::MutexLock lock(&mutex_); + status = return_status_; + } if (callback_ != nullptr) { - callback_(GrpcStatusToRayStatus(status_), reply_); + callback_(status, reply_); } } @@ -67,6 +83,16 @@ class ClientCallImpl : public ClientCall { /// gRPC status of this request. grpc::Status status_; + /// Mutex to protect the return_status_ field. + absl::Mutex mutex_; + + /// This is the status to be returned from GetStatus(). It is safe + /// to read from other threads while they hold mutex_. We have + /// return_status_ = GrpcStatusToRayStatus(status_) but need + /// a separate variable because status_ is set internally by + /// GRPC and we cannot control it holding the lock. + ray::Status return_status_ GUARDED_BY(mutex_); + /// Context for the client. It could be used to convey extra information to /// the server and/or tweak certain RPC behaviors. grpc::ClientContext context_; @@ -205,6 +231,7 @@ class ClientCallManager { break; } else if (status != grpc::CompletionQueue::TIMEOUT) { auto tag = reinterpret_cast(got_tag); + tag->GetCall()->SetReturnStatus(); if (ok && !main_service_.stopped() && !shutdown_) { // Post the callback to the main event loop. main_service_.post([tag]() { diff --git a/src/ray/test/run_core_worker_tests.sh b/src/ray/test/run_core_worker_tests.sh index 709d30daa..4a602d2cd 100644 --- a/src/ray/test/run_core_worker_tests.sh +++ b/src/ray/test/run_core_worker_tests.sh @@ -22,7 +22,7 @@ fi set -e set -x -bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:raylet_monitor" "//:libray_redis_module.so" "@plasma//:plasma_store_server" +bazel build -c dbg $RAY_BAZEL_CONFIG "//:core_worker_test" "//:mock_worker" "//:raylet" "//:raylet_monitor" "//:libray_redis_module.so" "@plasma//:plasma_store_server" # Get the directory in which this script is executing. SCRIPT_DIR="`dirname \"$0\"`" @@ -39,24 +39,25 @@ if [ ! -d "$RAY_ROOT/python" ]; then fi REDIS_MODULE="./bazel-bin/libray_redis_module.so" +BAZEL_BIN_PREFIX="$(bazel info -c dbg $RAY_BAZEL_CONFIG bazel-bin)" LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}" -STORE_EXEC="./bazel-bin/external/plasma/plasma_store_server" -RAYLET_EXEC="./bazel-bin/raylet" -RAYLET_MONITOR_EXEC="./bazel-bin/raylet_monitor" -MOCK_WORKER_EXEC="./bazel-bin/mock_worker" +STORE_EXEC="$BAZEL_BIN_PREFIX/external/plasma/plasma_store_server" +RAYLET_EXEC="$BAZEL_BIN_PREFIX/raylet" +RAYLET_MONITOR_EXEC="$BAZEL_BIN_PREFIX/raylet_monitor" +MOCK_WORKER_EXEC="$BAZEL_BIN_PREFIX/mock_worker" # Allow cleanup commands to fail. -bazel run //:redis-cli -- -p 6379 shutdown || true +bazel run "//:redis-cli" -- -p 6379 shutdown || true sleep 1s -bazel run //:redis-cli -- -p 6380 shutdown || true +bazel run "//:redis-cli" -- -p 6380 shutdown || true sleep 1s -bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 & +bazel run "//:redis-server" -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 & sleep 2s -bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 & +bazel run "//:redis-server" -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 & sleep 2s # Run tests. -./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $RAYLET_MONITOR_EXEC $MOCK_WORKER_EXEC +bazel run -c dbg $RAY_BAZEL_CONFIG "//:core_worker_test" $STORE_EXEC $RAYLET_EXEC $RAYLET_PORT $RAYLET_MONITOR_EXEC $MOCK_WORKER_EXEC sleep 1s -bazel run //:redis-cli -- -p 6379 shutdown -bazel run //:redis-cli -- -p 6380 shutdown +bazel run "//:redis-cli" -- -p 6379 shutdown +bazel run "//:redis-cli" -- -p 6380 shutdown sleep 1s