[Serve] Fix flaky test with nursery double init (#6982)

This commit is contained in:
Simon Mo
2020-02-03 21:32:12 -08:00
committed by GitHub
parent 13882d052d
commit 5e8ded344a
5 changed files with 25 additions and 21 deletions
+7 -3
View File
@@ -1,6 +1,7 @@
import inspect
from functools import wraps
from tempfile import mkstemp
from multiprocessing import cpu_count
import numpy as np
@@ -65,7 +66,10 @@ def init(kv_store_connector=None,
blocking=False,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
ray_init_kwargs={"object_store_memory": int(1e8)},
ray_init_kwargs={
"object_store_memory": int(1e8),
"num_cpus": max(cpu_count(), 8)
},
gc_window_seconds=3600,
queueing_policy=RoutePolicy.Random,
policy_kwargs={}):
@@ -408,8 +412,8 @@ def split(endpoint_name, traffic_policy_dictionary):
global_state.policy_table.register_traffic_policy(
endpoint_name, traffic_policy_dictionary)
global_state.init_or_get_router().set_traffic.remote(
endpoint_name, traffic_policy_dictionary)
ray.get(global_state.init_or_get_router().set_traffic.remote(
endpoint_name, traffic_policy_dictionary))
@_ensure_connected
@@ -33,7 +33,7 @@ backend_config_v1 = serve.get_backend_config("echo:v1")
# goes to my_endpoint will now goes to echo:v1 backend.
serve.link("my_endpoint", "echo:v1")
print(requests.get("http://127.0.0.1:8000/echo").json())
print(requests.get("http://127.0.0.1:8000/echo", timeout=0.5).json())
# The service will be reachable from http
print(ray.get(serve.get_handle("my_endpoint").remote(response="hello")))
+11 -9
View File
@@ -32,8 +32,7 @@ class ActorNursery:
"""
def __init__(self):
# Dict: Actor handles -> tag
self.actor_handles = dict()
self.tag_to_actor_handles = dict()
self.bootstrap_state = dict()
@@ -44,10 +43,14 @@ class ActorNursery:
init_kwargs={},
is_asyncio=False):
"""Start an actor and add it to the nursery"""
# Avoid double initialization
if tag in self.tag_to_actor_handles.keys():
return [self.tag_to_actor_handles[tag]]
max_concurrency = ASYNC_CONCURRENCY if is_asyncio else None
handle = (actor_cls.options(max_concurrency=max_concurrency).remote(
*init_args, **init_kwargs))
self.actor_handles[handle] = tag
self.tag_to_actor_handles[tag] = handle
return [handle]
def start_actor_with_creator(self, creator, kwargs, tag):
@@ -58,19 +61,18 @@ class ActorNursery:
The kwargs input is passed to `ActorCls_remote` method.
"""
handle = creator(kwargs)
self.actor_handles[handle] = tag
self.tag_to_actor_handles[tag] = handle
return [handle]
def get_all_handles(self):
return {tag: handle for handle, tag in self.actor_handles.items()}
return self.tag_to_actor_handles
def get_handle(self, actor_tag):
return [self.get_all_handles()[actor_tag]]
return [self.tag_to_actor_handles[actor_tag]]
def remove_handle(self, actor_tag):
[handle] = self.get_handle(actor_tag)
self.actor_handles.pop(handle)
del handle
if actor_tag in self.tag_to_actor_handles.keys():
self.tag_to_actor_handles.pop(actor_tag)
def store_bootstrap_state(self, key, value):
self.bootstrap_state[key] = value
@@ -17,7 +17,7 @@ def test_e2e(serve_instance):
timeout_sleep = 0.5
while True:
try:
resp = requests.get("http://127.0.0.1:8000/").json()
resp = requests.get("http://127.0.0.1:8000/", timeout=0.5).json()
assert resp == result
break
except Exception: