Fix multi-thread problem of function manager and Jenkins test (#3648)

This commit is contained in:
Yuhong Guo
2019-01-03 17:05:13 +08:00
committed by Hao Chen
parent ad2287ebe9
commit 4b23a34c93
3 changed files with 42 additions and 18 deletions
+11 -2
View File
@@ -509,20 +509,29 @@ class FunctionActorManager(object):
"""
# We set the driver ID here because it may not have been available when
# the actor class was defined.
actor_class_info["driver_id"] = self._worker.task_driver_id.id()
self._worker.redis_client.hmset(key, actor_class_info)
self._worker.redis_client.rpush("Exports", key)
def export_actor_class(self, Class, actor_method_names,
checkpoint_interval):
function_descriptor = FunctionDescriptor.from_class(Class)
key = (b"ActorClass:" + self._worker.task_driver_id.id() + b":" +
# `task_driver_id` shouldn't be NIL, unless:
# 1) This worker isn't an actor;
# 2) And a previous task started a background thread, which didn't
# finish before the task finished, and still uses Ray API
# after that.
assert not self._worker.task_driver_id.is_nil(), (
"You might have started a background thread in a non-actor task, "
"please make sure the thread finishes before the task finishes.")
driver_id = self._worker.task_driver_id
key = (b"ActorClass:" + driver_id.id() + b":" +
function_descriptor.function_id.id())
actor_class_info = {
"class_name": Class.__name__,
"module": Class.__module__,
"class": pickle.dumps(Class),
"checkpoint_interval": checkpoint_interval,
"driver_id": driver_id.id(),
"actor_method_names": json.dumps(list(actor_method_names))
}
+5
View File
@@ -6,6 +6,7 @@ import binascii
import functools
import hashlib
import inspect
import logging
import numpy as np
import os
import subprocess
@@ -18,6 +19,8 @@ import ray.gcs_utils
import ray.raylet
import ray.ray_constants as ray_constants
logger = logging.getLogger(__name__)
def _random_string():
id_hash = hashlib.sha1()
@@ -69,6 +72,8 @@ def push_error_to_driver(worker,
if driver_id is None:
driver_id = ray_constants.NIL_JOB_ID.id()
data = {} if data is None else data
logging.error("Pushing error to dirver, type: %s, message: %s.",
error_type, message)
worker.raylet_client.push_error(
ray.ObjectID(driver_id), error_type, message, time.time())
+26 -16
View File
@@ -618,7 +618,8 @@ class Worker(object):
task_index = self.task_index
self.task_index += 1
# The parent task must be set for the submitted task.
assert not self.current_task_id.is_nil()
if self.actor_id == NIL_ACTOR_ID:
assert not self.current_task_id.is_nil()
# Submit the task to local scheduler.
function_descriptor_list = (
function_descriptor.get_function_descriptor_list())
@@ -766,23 +767,30 @@ class Worker(object):
use the outputs of this task).
"""
with self.state_lock:
assert self.task_driver_id.is_nil()
assert self.current_task_id.is_nil()
assert self.task_index == 0
assert self.put_index == 1
if task.actor_id().is_nil():
# If this worker is not an actor, check that `task_driver_id`
# was reset when the worker finished the previous task.
assert self.task_driver_id.is_nil()
# Set the driver ID of the current running task. This is
# needed so that if the task throws an exception, we propagate
# the error message to the correct driver.
self.task_driver_id = task.driver_id()
else:
# If this worker is an actor, task_driver_id wasn't reset.
# Check that current task's driver ID equals the previous one.
assert self.task_driver_id == task.driver_id()
# The ID of the driver that this task belongs to. This is needed so
# that if the task throws an exception, we propagate the error
# message to the correct driver.
self.task_driver_id = task.driver_id()
self.current_task_id = task.task_id()
function_descriptor = FunctionDescriptor.from_bytes_list(
task.function_descriptor_list())
args = task.arguments()
return_object_ids = task.returns()
if (task.actor_id().id() != NIL_ACTOR_ID
or task.actor_creation_id().id() != NIL_ACTOR_ID):
if (not task.actor_id().is_nil()
or not task.actor_creation_id().is_nil()):
dummy_return_id = return_object_ids.pop()
function_executor = function_execution_info.function
function_name = function_execution_info.function_name
@@ -809,11 +817,11 @@ class Worker(object):
# Execute the task.
try:
with profiling.profile("task:execute", worker=self):
if (task.actor_id().id() == NIL_ACTOR_ID
and task.actor_creation_id().id() == NIL_ACTOR_ID):
if (task.actor_id().is_nil()
and task.actor_creation_id().is_nil()):
outputs = function_executor(*arguments)
else:
if task.actor_id().id() != NIL_ACTOR_ID:
if not task.actor_id().is_nil():
key = task.actor_id().id()
else:
key = task.actor_creation_id().id()
@@ -822,7 +830,7 @@ class Worker(object):
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().id() == NIL_ACTOR_ID
task_exception = task.actor_id().is_nil()
traceback_str = ray.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
self._handle_process_task_failure(
@@ -881,7 +889,7 @@ class Worker(object):
# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if (task.actor_creation_id() != ray.ObjectID(NIL_ACTOR_ID)):
if not task.actor_creation_id().is_nil():
assert self.actor_id == NIL_ACTOR_ID
self.actor_id = task.actor_creation_id().id()
self.function_actor_manager.load_actor(driver_id,
@@ -901,8 +909,8 @@ class Worker(object):
"name": function_name,
"task_id": task.task_id().hex()
}
if task.actor_id().id() == NIL_ACTOR_ID:
if (task.actor_creation_id() == ray.ObjectID(NIL_ACTOR_ID)):
if task.actor_id().is_nil():
if task.actor_creation_id().is_nil():
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
else:
@@ -920,7 +928,9 @@ class Worker(object):
self._process_task(task, execution_info)
# Reset the state fields so the next task can run.
with self.state_lock:
self.task_driver_id = ray.ObjectID(NIL_ID)
if self.actor_id == NIL_ACTOR_ID:
# We will keep task_driver_id unchanged for actor.
self.task_driver_id = ray.ObjectID(NIL_ID)
self.current_task_id = ray.ObjectID(NIL_ID)
self.task_index = 0
self.put_index = 1