Split local scheduler task queue (#211)

* Split local scheduler task queue into waiting and dispatch queue

* Fix memory leak

* Add a new task scheduling status for when a task has been queued locally

* Fix global scheduler test case and add task status doc

* Documentation

* Address Philipp's comments

* Move tasks back to the waiting queue if their dependencies become unavailable

* Update existing task table entries instead of overwriting
This commit is contained in:
Stephanie Wang
2017-01-18 20:27:40 -08:00
committed by Philipp Moritz
parent 6fe69bec11
commit f1987cdc16
7 changed files with 356 additions and 175 deletions
+21 -12
View File
@@ -25,8 +25,9 @@ ID_SIZE = 20
# These constants must match the scheduling state enum in task.h.
TASK_STATUS_WAITING = 1
TASK_STATUS_SCHEDULED = 2
TASK_STATUS_RUNNING = 4
TASK_STATUS_DONE = 8
TASK_STATUS_QUEUED = 4
TASK_STATUS_RUNNING = 8
TASK_STATUS_DONE = 16
# These constants are an implementation detail of ray_redis_module.c, so this
# must be kept in sync with that file.
@@ -161,8 +162,10 @@ class TestGlobalScheduler(unittest.TestCase):
if len(task_entries) == 1:
task_contents = self.redis_client.hgetall(task_entries[0])
task_status = int(task_contents[b"state"])
self.assertTrue(task_status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED])
if task_status == TASK_STATUS_SCHEDULED:
self.assertTrue(task_status in [TASK_STATUS_WAITING,
TASK_STATUS_SCHEDULED,
TASK_STATUS_QUEUED])
if task_status == TASK_STATUS_QUEUED:
break
else:
print(task_status)
@@ -170,7 +173,7 @@ class TestGlobalScheduler(unittest.TestCase):
num_retries -= 1
time.sleep(1)
if num_retries <= 0 and task_status != TASK_STATUS_SCHEDULED:
if num_retries <= 0 and task_status != TASK_STATUS_QUEUED:
# Failed to submit and schedule a single task -- bail.
self.tearDown()
sys.exit(1)
@@ -204,12 +207,18 @@ class TestGlobalScheduler(unittest.TestCase):
if len(task_entries) == num_tasks:
task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))]
task_statuses = [int(contents[b"state"]) for contents in task_contents]
self.assertTrue(all([status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED] for status in task_statuses]))
num_tasks_done = task_statuses.count(TASK_STATUS_SCHEDULED)
self.assertTrue(all([
status in [TASK_STATUS_WAITING,
TASK_STATUS_SCHEDULED,
TASK_STATUS_QUEUED] for status in task_statuses
]))
num_tasks_done = task_statuses.count(TASK_STATUS_QUEUED)
num_tasks_scheduled = task_statuses.count(TASK_STATUS_SCHEDULED)
num_tasks_waiting = task_statuses.count(TASK_STATUS_WAITING)
print("tasks in Redis = {}, tasks waiting = {}, tasks scheduled = {}, retries left = {}"
.format(len(task_entries), num_tasks_waiting, num_tasks_done, num_retries))
if all([status == TASK_STATUS_SCHEDULED for status in task_statuses]):
print("tasks in Redis = {}, tasks waiting = {}, tasks scheduled = {}, tasks queued = {}, retries left = {}"
.format(len(task_entries), num_tasks_waiting,
num_tasks_scheduled, num_tasks_done, num_retries))
if all([status == TASK_STATUS_QUEUED for status in task_statuses]):
# We're done, so pass.
break
num_retries -= 1
@@ -231,8 +240,8 @@ class TestGlobalScheduler(unittest.TestCase):
if __name__ == "__main__":
if len(sys.argv) > 1:
# Pop the argument so we don't mess with unittest's own argument parser.
arg = sys.argv.pop()
if arg == "valgrind":
if sys.argv[-1] == "valgrind":
arg = sys.argv.pop()
USE_VALGRIND = True
print("Using valgrind for tests")
unittest.main(verbosity=2)