Enable direct calls by default (#6367)

* wip

* add

* timeout fix

* const ref

* comments

* fix

* fix

* Move actor state into actor handle

* comments 2

* enable by default

* temp reorder

* some fixes

* add debug code

* tmp

* fix

* wip

* remove dbg

* fix compile

* fix

* fix check

* remove non direct tests

* Increment ref count before resolving value

* rename

* fix another bug

* tmp

* tmp

* Fix object pinning

* build change

* lint

* ActorManager

* tmp

* ActorManager

* fix test component failures

* Remove old code

* Remove unused

* fix

* fix

* fix resources

* fix advanced

* eric's diff

* blacklist

* blacklist

* cleanup

* annotate

* disable tests for now

* remove

* fix

* fix

* clean up verbosity

* fix test

* fix concurrency test

* Update .travis.yml

* Update .travis.yml

* Update .travis.yml

* split up analysis suite

* split up trial runner suite

* fix detached direct actors

* fix

* split up advanced tesT

* lint

* fix core worker test hang

* fix bad check fail which breaks test_cluster.py in tune

* fix some minor diffs in test_cluster

* less workers

* make less stressful

* split up test

* retry flaky tests

* remove old test flags

* fixes

* lint

* Update worker_pool.cc

* fix race

* fix

* fix bugs in node failure handling

* fix race condition

* fix bugs in node failure handling

* fix race condition

* nits

* fix test

* disable heartbeatS

* disable heartbeatS

* fix

* fix

* use worker id

* fix max fail

* debug exit

* fix merge, and apply [PATCH] fix concurrency test

* [patch] fix core worker test hang

* remove NotifyActorCreation, and return worker on completion of actor creation task

* remove actor diied callback

* Update core_worker.cc

* lint

* use task manager

* fix merge

* fix deadlock

* wip

* merge conflits

* fix

* better sysexit handling

* better sysexit handling

* better sysexit handling

* check id

* better debug

* task failed msg

* task failed msg

* retry failed tasks with delay

* retry failed tasks with delay

* clip deps

* fix

* fix core worker tests

* fix task manager test

* fix all tests

* cleanup

* set to 0 for direct tests

* dont check worker id for ownership rpc

* dont check worker id for ownership rpc

* debug messages

* add comment

* remove debug statements

* nit

* check worker id

* fix test

* owner

* fix tests
This commit is contained in:
Eric Liang
2019-12-13 13:58:04 -08:00
committed by GitHub
parent 3754effafc
commit be5dd8eb5e
16 changed files with 174 additions and 362 deletions
+1 -1
View File
@@ -17,7 +17,7 @@ def env_integer(key, default):
def direct_call_enabled():
return bool(int(os.environ.get("RAY_FORCE_DIRECT", "0")))
return bool(int(os.environ.get("RAY_FORCE_DIRECT", "1")))
ID_SIZE = 20
+11 -36
View File
@@ -151,19 +151,22 @@ def main():
timeit("1:1 actor calls async", actor_async, 1000)
a = Actor.options(is_direct_call=True).remote()
a = Actor.options(max_concurrency=16).remote()
def actor_concurrent():
ray.get([a.small_value.remote() for _ in range(1000)])
timeit("1:1 direct actor calls async", actor_concurrent, 1000)
timeit("1:1 actor calls concurrent", actor_concurrent, 1000)
a = Actor.options(is_direct_call=True, max_concurrency=16).remote()
n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [Actor._remote() for _ in range(n_cpu)]
client = Client.remote(actors)
def actor_concurrent():
ray.get([a.small_value.remote() for _ in range(1000)])
def actor_async_direct():
ray.get(client.small_value_batch.remote(n))
timeit("1:1 direct actor calls concurrent", actor_concurrent, 1000)
timeit("1:n actor calls async", actor_async_direct, n * len(actors))
n_cpu = multiprocessing.cpu_count() // 2
a = [Actor.remote() for _ in range(n_cpu)]
@@ -177,44 +180,16 @@ def main():
timeit("n:n actor calls async", actor_multi2, m * n)
n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)]
client = Client.remote(actors)
def actor_async_direct():
ray.get(client.small_value_batch.remote(n))
timeit("1:n direct actor calls async", actor_async_direct, n * len(actors))
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct():
ray.get([c.small_value_batch.remote(n) for c in clients])
timeit("n:n direct actor calls async", actor_multi2_direct,
n * len(clients))
n = 1000
actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)]
actors = [Actor._remote() for _ in range(n_cpu)]
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct_arg():
ray.get([c.small_value_batch_arg.remote(n) for c in clients])
timeit("n:n direct actor calls with arg async", actor_multi2_direct_arg,
timeit("n:n actor calls with arg async", actor_multi2_direct_arg,
n * len(clients))
n = 1000
actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)]
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct_arg():
ray.get([c.small_value_batch_arg.remote(n) for c in clients])
timeit("multi client direct actor calls with arg async",
actor_multi2_direct_arg, n * len(clients))
if __name__ == "__main__":
main()
+6 -51
View File
@@ -6,14 +6,6 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_direct",
size = "medium",
srcs = ["test_actor_direct.py", "test_actor.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_resources",
size = "medium",
@@ -22,19 +14,12 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_resources_direct",
size = "medium",
srcs = ["test_actor_resources_direct.py", "test_actor_resources.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_failures",
size = "medium",
srcs = ["test_actor_failures.py"],
tags = ["exclusive"],
# TODO(ekl) enable this once we support actor reconstruction again
tags = ["exclusive", "manual"],
deps = ["//:ray_lib"],
)
@@ -46,14 +31,6 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_basic_direct",
size = "medium",
srcs = ["test_basic_direct.py", "test_basic.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_advanced",
size = "medium",
@@ -126,14 +103,6 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_direct",
size = "medium",
srcs = ["test_stress_direct.py", "test_stress.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_sharded",
size = "medium",
@@ -142,19 +111,12 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_sharded_direct",
size = "medium",
srcs = ["test_stress_sharded_direct.py", "test_stress_sharded.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_failure",
size = "large",
srcs = ["test_stress_failure.py"],
tags = ["exclusive"],
# TODO(ekl) enable again once we support direct call reconstruction
tags = ["exclusive", "manual"],
deps = ["//:ray_lib"],
)
@@ -209,14 +171,6 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_failure_direct",
size = "medium",
srcs = ["test_failure_direct.py", "test_failure.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_garbage_collection",
size = "small",
@@ -276,7 +230,8 @@ py_test(
name = "test_monitors",
size = "small",
srcs = ["test_monitors.py"],
tags = ["exclusive"],
# TODO(ekl) tasks() and objects() are different in direct call mode.
tags = ["exclusive", "manual"],
deps = ["//:ray_lib"],
)
-16
View File
@@ -1,16 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_actor.py")]))
@@ -1,17 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main([
"-v",
os.path.join(os.path.dirname(__file__), "test_actor_resources.py")
]))
-16
View File
@@ -1,16 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_basic.py")]))
-16
View File
@@ -1,16 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_failure.py")]))
@@ -1,18 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main([
"-v",
os.path.join(
os.path.dirname(__file__), "test_multinode_failures.py")
]))
-16
View File
@@ -1,16 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_stress.py")]))
@@ -1,17 +0,0 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main([
"-v",
os.path.join(os.path.dirname(__file__), "test_stress_sharded.py")
]))
+3 -1
View File
@@ -14,6 +14,7 @@ import types
import ray.cloudpickle as cloudpickle
from ray.tune import TuneError
from ray.tune.progress_reporter import trial_progress_str
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.result import (TIME_THIS_ITER_S, RESULT_DUPLICATE,
SHOULD_CHECKPOINT)
@@ -380,7 +381,8 @@ class TrialRunner(object):
def debug_string(self, delim="\n"):
messages = [
self._scheduler_alg.debug_string(),
self.trial_executor.debug_string()
self.trial_executor.debug_string(),
trial_progress_str(self.get_trials()),
]
return delim.join(messages)