Reset signal counters when a task finishes (#4173)

This commit is contained in:
Ion
2019-02-28 15:15:03 -08:00
committed by Philipp Moritz
parent 4dc683d39e
commit 88e14feb53
3 changed files with 24 additions and 1 deletions
+2 -1
View File
@@ -170,4 +170,5 @@ def reset():
If the worker calls receive() on a source next, it will get all the
signals generated by that source starting with index = 1.
"""
ray.worker.global_worker.signal_counters = defaultdict(lambda: b"0")
if hasattr(ray.worker.global_worker, "signal_counters"):
ray.worker.global_worker.signal_counters = defaultdict(lambda: b"0")
+19
View File
@@ -316,3 +316,22 @@ def test_receiving_on_two_returns(ray_start):
assert ((x == results[0][0] and y == results[1][0])
or (x == results[1][0] and y == results[0][0]))
def test_serial_tasks_reading_same_signal(ray_start):
@ray.remote
def send_signal(value):
signal.send(UserSignal(value))
a = send_signal.remote(0)
@ray.remote
def f(sources):
return ray.experimental.signal.receive(sources, timeout=1)
result_list = ray.get(f.remote([a]))
assert len(result_list) == 1
result_list = ray.get(f.remote([a]))
assert len(result_list) == 1
result_list = ray.get(f.remote([a]))
assert len(result_list) == 1
+3
View File
@@ -965,6 +965,9 @@ class Worker(object):
# actor. Because the following tasks should all have the
# same driver id.
self.task_driver_id = DriverID.nil()
# Reset signal counters so that the next task can get
# all past signals.
ray_signal.reset()
# Increase the task execution counter.
self.function_actor_manager.increase_task_counter(