From 0c0b0d0a73fc7fed019b164336382ed6ee3c04ea Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 3 Sep 2020 12:45:24 -0600 Subject: [PATCH] [Core] Added support for submission-time task names. (#10449) * Added support for submission-time task names. * Suggestions from code review: add missing consts Co-authored-by: SangBin Cho * Add num_returns arg to actor method options docstring example. * Add process name line and proctitle assertion to submission-time task name section of advanced docs. * Add submission-time task name --> proctitle test for Python worker. * Added Python actor options tests for num_returns and name. * Added Java test for submission-time task names. * Add dashboard image to task name docs section. * Move to fstrings. Co-authored-by: SangBin Cho --- cpp/src/ray/runtime/abstract_ray_runtime.cc | 10 +-- cpp/src/ray/runtime/task/invocation_spec.h | 1 + .../runtime/task/local_mode_task_submitter.cc | 6 +- cpp/src/ray/worker/default_worker.cc | 7 +- doc/source/advanced.rst | 20 ++++++ doc/source/images/task_name_dashboard.png | Bin 0 -> 37975 bytes .../java/io/ray/api/call/BaseTaskCaller.java | 23 +++++-- .../java/io/ray/api/options/CallOptions.java | 37 ++++++---- .../main/java/io/ray/test/TaskNameTest.java | 19 ++++++ python/ray/_raylet.pyx | 26 ++++--- python/ray/actor.py | 33 +++++++-- python/ray/includes/common.pxd | 2 +- python/ray/includes/libcoreworker.pxd | 1 + python/ray/remote_function.py | 5 +- python/ray/tests/test_actor.py | 31 +++++++++ python/ray/tests/test_advanced_3.py | 34 ++++++++-- src/ray/common/function_descriptor.h | 32 ++++++++- src/ray/common/task/task_spec.cc | 7 +- src/ray/common/task/task_spec.h | 3 + src/ray/common/task/task_util.h | 3 +- src/ray/core_worker/common.h | 7 +- src/ray/core_worker/core_worker.cc | 42 ++++++++---- src/ray/core_worker/core_worker.h | 2 +- .../java/io_ray_runtime_RayNativeRuntime.cc | 3 +- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 7 +- src/ray/core_worker/lib/java/jni_init.cc | 7 ++ src/ray/core_worker/lib/java/jni_utils.h | 5 ++ src/ray/core_worker/test/core_worker_test.cc | 16 ++--- .../test/direct_task_transport_test.cc | 7 +- src/ray/core_worker/test/mock_worker.cc | 7 +- src/ray/gcs/test/gcs_test_util.h | 7 +- src/ray/protobuf/common.proto | 64 ++++++++++-------- .../scheduling/cluster_task_manager_test.cc | 4 +- .../raylet/task_dependency_manager_test.cc | 2 +- streaming/src/queue/transport.cc | 3 +- streaming/src/test/mock_actor.cc | 7 +- streaming/src/test/queue_tests_base.h | 6 +- 37 files changed, 361 insertions(+), 135 deletions(-) create mode 100644 doc/source/images/task_name_dashboard.png create mode 100644 java/test/src/main/java/io/ray/test/TaskNameTest.java diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 92125a580..d2016863c 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -67,8 +67,9 @@ WaitResult AbstractRayRuntime::Wait(const std::vector &ids, int num_ob ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr, std::shared_ptr args) { InvocationSpec invocationSpec; - invocationSpec.task_id = - TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + // TODO(Guyang Song): make it from different task + invocationSpec.task_id = TaskID::ForFakeTask(); + invocationSpec.name = ""; invocationSpec.actor_id = ActorID::Nil(); invocationSpec.args = args; invocationSpec.func_offset = @@ -87,8 +88,9 @@ ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, std::shared_ptr args) { InvocationSpec invocationSpec; - invocationSpec.task_id = - TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + // TODO(Guyang Song): make it from different task + invocationSpec.task_id = TaskID::ForFakeTask(); + invocationSpec.name = ""; invocationSpec.actor_id = actor; invocationSpec.args = args; invocationSpec.func_offset = diff --git a/cpp/src/ray/runtime/task/invocation_spec.h b/cpp/src/ray/runtime/task/invocation_spec.h index b131f2fd0..b436ef5fc 100644 --- a/cpp/src/ray/runtime/task/invocation_spec.h +++ b/cpp/src/ray/runtime/task/invocation_spec.h @@ -11,6 +11,7 @@ namespace api { class InvocationSpec { public: TaskID task_id; + std::string name; ActorID actor_id; int actor_counter; /// Remote function offset from base address. diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 6603dc1b6..8f2f28f2d 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -30,8 +30,10 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy std::unordered_map required_resources; std::unordered_map required_placement_resources; TaskSpecBuilder builder; - builder.SetCommonTaskSpec(invocation.task_id, rpc::Language::CPP, functionDescriptor, - local_mode_ray_tuntime_.GetCurrentJobID(), + std::string task_name = + invocation.name.empty() ? functionDescriptor->DefaultTaskName() : invocation.name; + builder.SetCommonTaskSpec(invocation.task_id, task_name, rpc::Language::CPP, + functionDescriptor, local_mode_ray_tuntime_.GetCurrentJobID(), local_mode_ray_tuntime_.GetCurrentTaskId(), 0, local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, required_resources, required_placement_resources, diff --git a/cpp/src/ray/worker/default_worker.cc b/cpp/src/ray/worker/default_worker.cc index 2b841012e..81104509d 100644 --- a/cpp/src/ray/worker/default_worker.cc +++ b/cpp/src/ray/worker/default_worker.cc @@ -29,8 +29,8 @@ class DefaultWorker { "", // driver_name "", // stdout_file "", // stderr_file - std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, - _7), // task_execution_callback + std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, + _8), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect nullptr, // spill_objects @@ -51,7 +51,8 @@ class DefaultWorker { void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } private: - Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + Status ExecuteTask(TaskType task_type, const std::string task_name, + const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index 52ce58bd2..3083fd9db 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -51,6 +51,26 @@ And vary the number of return values for tasks (and actor methods too): assert ray.get(id1) == 0 assert ray.get(id2) == 1 +And specify a name for tasks (and actor methods too) at task submission time: + +.. code-block:: python + + import setproctitle + + @ray.remote + def f(x): + assert setproctitle.getproctitle() == "ray::special_f" + return x + 1 + + obj = f.options(name="special_f").remote(3) + assert ray.get(obj) == 4 + +This name will appear as the task name in the machine view of the dashboard, will appear +as the worker process name when this task is executing (if a Python task), and will +appear as the task name in the logs. + +.. image:: images/task_name_dashboard.png + Dynamic Custom Resources ------------------------ diff --git a/doc/source/images/task_name_dashboard.png b/doc/source/images/task_name_dashboard.png new file mode 100644 index 0000000000000000000000000000000000000000..4465e557b0c27b41eebc8e46d4d0358b2b580736 GIT binary patch literal 37975 zcmeFZby!qg+cr)rpoD}dN+{B0FcJbYgh+R%K@KU*(5WJ#(%s!KDBU2al!(NDG)Q;X zP~RH$es16QdE@&Xzkj~xd%ycQ)VXK&>^*z!wbpfB=XqXNfQr%sQW82692^`{Ss5uc z92|Tm92`6{qBGzfSgK_l4i2ugrKF^atfVB9ii4e*rL`#z4twa6hYu-bx!GGEJ$m@C zwUdj3#KBqZ)vLE^551cj+ZtONn;SnhM#LBxOkARvz}INPNmu<)VFfFL+3V7p^zTin zmnot40&Jl5JBuBstZ^}msyNzm@Ta#!PVhfkwh`WEIe-2w2dvz|RTlTvD6YvHYFv6k zlP=u%z1NmWnVcwJcoGz_-e|qtT90dT?uPjdyv~OnxF$zfawLh7|so zeVHYU;&D~z`SWHwu+L&IbR1amo5^faD(0?#?3225iJ8-4Aeu6zzC?8E21*ht_)X+ibC@Kg)v9sZQWMcQ&l-I?^9{Uv>5f@?b z(#90=h{?sq+SXCnMfCd5BZR?g?Av_TnSLIEuoAtlqo~3pY3E?d1mWf9y?I@Xgo%ks z#KFW&SWQa$_t(MyiC(urAnb+t_?(@cd7ba@+Bul>-4YTK;=6g9@AhpTa0HK|t1aS@ z3y-ZM^RJWqbsi~G$0rV!_6SQmTPEyzA3e51B1ErW$6n|k|NZ)%rY@F$U&+?-_ice2 zHX-#^X`zAA!!S6Ic;#nf6y%F+f554eVy;LV#NKac;XPyW8*uWxEPnmS0@ z*?=!1#Qq-N-(UXo!+-z9pO@7A`;w5`cmKTPAD{gGrU)PQ&i@#RU&H+KT`H6xVI*HAgIw?&Ia*gXF!6DwNP(RhQFv#p-a?*QG<3t8)oU z?uA_)o`wT+HHi|b_O~hVh?snE{`xC~9Phz&^WE`C=Ln0)UEb8rJznluRT`0Tej1>$W}x-BOfeYNV`kor{)rU)%m{por$`R>ku z@f=+Dq6GMzURJn-7w$Q)mR2K|^!8Rq3mxaaY89Kvx*u({tkfT_I>wXxN!TXUx$j|W zN~{KWxwKIU`mMcWMR_2Mq$l?KG@6oTqOWf{riO$wb{e@2j zTzh2Ms!XYK6cPlxJF}bU!7{yZkqJk3S&|oz=mnwE+L#Ta=XNORmAhZY+}A{h8@#$p<>OXLuY6*58r|ElpMc}K~$$>dEm>jJ1#z}7b#oX-bQF&?~snye4 zEmxoHEDcm7dtutN3iW7WKbaFyyrN7Ym!T#yz5rv&_kl~I8cy8wYhB()@){16TMs@q zPxdT7$KIzfITZ}O8@V6Jq5fGeg8fn6#{9R`=R3=Ty+&RKrusEbg}LRX+w*~WJF$>;N2#`U$D1v=ecV7Mc8*MGd>bAntuN6T9p zt}G^$X&OW9t3b_!)Aq6C>n-MWW85-N+o4j+i@~jQ=S_3yOP>X2C%LuL3D{cap)2Q- z+((=VFBjErzScKdNe&iXWFKI_^eRKk@XwNc*SD*i5q9R)E&iCLlo)4v&qEwMd@?rQ z$!7m}BO|qz%@5Dd4qteXF{UF$|2-0M%zVuU=7EbBvVV7)iNeKK!jELobuKY_MsV(H zqE)+g+yl@#Ab84ZP)vB&%3|Y)p2Q{ zDEXY!5zRU-*eqr6ee+S(?KG|IuT;i^@-*|~XjL+*4@=BC9ZLF9hWF6-W5|8qV(+@O z_p3PbdcF@{h?pnFWYT3m)!!Nn=SeBVsy}Anq8^ndU5**?aJh}+mPJhalVH*QF^?_9 znyH|B1uh#uPzPvIfsu)#=f563Z_1Lv^E0Flb@6Z%X40QAE-qHu=#FEZ#6>BlM!ehs zt}~Z>B_(lu_wnE@wTK0^fnZ5ToALV5C_dv=jN^PwoRMa}R_B<@6sa~>+O74&-IXhj zFL*kYv5V;0^>>e&&uH&&PKPU3>pc!+5OkQ9QJ*9u#wGZ1j>6|-#kI4z1WkB(Z7OV= z!*&g$S&H$qW61D2%wm3GFvh3UX1IUM`_yx|@@e|vBNTJ6NYCqwJd2w&jXC8eJsGkn z{ChcI{=ifS>W_Ez2iTOFcZ}>m5N3tqpt&Po;NS%iT0D#V?r%VF+X&|S8MCuw17@SG z#-%`_bh@CIq*sLF#h;%!fR1GnFRE{D_bMOa%DlPAv;3S~>>bLmX6PJW?L#`2m*7N1 zS9$TO%{=!;or#^Z9cJ1GXLa8{zTafpDuo>vzb2Mg|Ji}hKkS}gC7>Yk@!_RuINE%3 z<+VWnq{Y14vDho=rq@b(TFrZTckY2(|GCNC*7w1=QPKr{5d4og>jSGN`F8e(an<0Z zKO$``4+yG*bIIusz<~bVADN5^iC~+N@qyR=j;tYrcmzzbo+mE) zipfj`*ByPIKW`ex*Mo|knk3s}nC=R8a{VJ#yuFx0tR%gkT;mkJa5XuAqGp_~_z`lGa;4 z(cIOqV21c_4L%z8QuhtKA}~ayEb^$vX{8Y)4F>S)bCsY0IR>B%v% zd0?b>2T#R!uz^wXK48ToGNF03QxSm-8b2AfDylOBarAga06|o4FX%%5tob1g(X<|z zqyq@d2qb>pP{;(a~0yQ!}YRrDw13TICek@cSqJ z71h>j!u4Mh@6}9M_7}tkGKdOyhy-lfH5@xt6pnM={8G&9A7{_^dQybVVVhsSH)Yp57wNJXB zr?ralcRLw8w^gA~%=2f==q!bpn;(okHtru{mWo!uTnI|)^c=5p%+9@-?s2fWJ7}J) zTVXp|;C*^>_vw$15t}sB#~Z=kh~a=LT-NJ8K2$Qh6;S~ScC{NXA=cl&0o0bTvX12O zSjdbvph#OPATdPnuv`lXz;076{dbmQ1Z;+;84$*+dgl?N6R{KbVPLL2DaB z&>=3eon9K%y8usAA1OnR`#LtqAzk|$Q#Jq>o|Mj$1HgqKpB|zW&3V8!8Zzd&XLv=( zA;S}#z-u?&t{%+Ay@um8w@U*jQvIARv3g46CxLX3P1dTo=dpLpv77sZ@|&X+o0A1) zZwD5~NG|R2vy2|!G~Q5Fb+#Vpms?vJdu;fPg5Vw+K05d54QRP1^&_EKetYNZJ8Tf==h0H55*jT4F?ebAxS8?fA%I~BrT4R^ zt2Jv_FlU>jmMJ%A8f|C>&|Ol)iOGRpc961f5W_neA|?ubrpN{sJ6XduMCWVkO<~tf z6a5miK!G<{Xo33H7@x9Kh>EPXwbT!l^}9Iku@zXiH|Cz@f1(5CrtV$jq6U;}Y)8|N zz1peBsBTF5?WIZMJ%r+b7#M(dn+{{u1oAP_6cDy=&3%I?4d|lzqj~iEvAA>G>#z_6 zchT9SxSTi(LLOlzLhkVXkN~ zCve~dflB?>tTGqwXep}l2gx3+=&YG$pgYTC_t`Z6k^G|iAwr7kg&fOJxo?q($u)F- zw7W%rC+}ufuE)-kpE%7)Noa;zu|2BP6XNZ5yfa`kR_lriP?=T$NU4IWsJ2IRe{nX| z{~_TSx#|O{Gvq(v?p48WK|-@Jt(uix!dTp-xAEiCv52TovdfQvlf8LzIAvy*wvpOH zl?<6s>z0L?rPP!8^x)lkGgs~KeAAAogEGGMk#~=tqYHFa|3XUX>gIp^30OUD3avA=LhE1q|yEuC~f2>nM%9KSuYI`o=D8)T;)mX0xo(tBw2UJk)TZRNH`f9$GzM=-Y z4=t~rrSMd-wi>bUgXB$L17E0bRL8=G+wc#1UjyP7j0q$+{Op!f6Y5D!9t>LOojYnc z-8+?-%#u2}_2Rj|%6{5Py+31^73`%QubI zPI4lk(4&V-S$@MXfBv=mw1(6qzeyD`N~IGPR5(EU)?u}y?8tWa2)tr#+N#hL{nmPC&gH8_4dU-BwswZHZt8o-wTqt0hN`ZGRaB* zH!79MZ=mwU9-WtLA73Qdi`|Hx#0MdcR%XVoO`mhbnTwk2_Q*iRm|KV8coX^eRrg90 zF=}2xVn3@?$w@p$=T4#l0{1sBn{XfGC?yJCTN9>0K}h}L^)LsRl%-dwsmZLkRQpb= zd64F8Y*85+Bk6m~o69Orj;c}q_yr-m^0EU7>-cW$BYR(x*)Y#nX(xR+-HJ~QJIXD4 zKO1&5mo=r@Ln$hZykEW|ubxW)8Su5ylG=r#b3^m5w$EYt>7<4QC{VRqj9%8FwmFde6t^Pv5y}GGjsec%|fPp*CgitX>|e zYPL({NiO2HZhZxQ-mNeefVUr?%fM7j^Ays}19}#}x7B1}k}c+!RZ01#xD`ZLQ=xYi^WYVh)MDQ-;Q6l6f0}1*U}>xna4) z=9M>F`Z72T8mHrC-C8J*QCanqJ}oOx5J`K6EjecO)w~Osw56I|0j-syxSmCZ6g?(B z%ZC@xU+0G|lU*uMP8Rbr4!5^Aqv6s*9Hw;1RkvJqUw*&pc>8$*LypZ{DC|Jpe` zU<@i6F^72dpip^fCoD*R_;_^9YfiC!qXwg#6Ic>=Cs(~Xy3`2CsojgahASzakhPF{ zL)cY7rLfCtHxs5hcU!BMFp+#`vb)(wxR)#U^i_Fxkuj9^=tlcqd69p<#@7Y`=o*np z9IUvZ%z_+ZrKyevghZ9ZSx8ln@V0YN=qh4GQ*BZ?fRe>vGO&Vdhwa{GEA#QJ>e%+Y z^NzPVMY>fcCTYPfW*I|QN9L)`NV>zP8m-vw4U1SHk5AA4d)?u%>&WJ7t*#Kglg7 zr-kMsrc{l?bTUu%lmv`Nzc_|zIAnzh@#QMNdKM#fEM4pRq|QoLcQGqVFS#{iW&ToA zXZ=x|FqCst!<$spMT&LhIXtj5fq^7qTbKNXk&W8)k+J7aDwF>s*Lx$9(dhW z`$ogXa;J4+KEf;#6`oRLFVYd8t{% z@Xx89yWm0OT~=#BCOQY!6#1Vc9bZCnD@PR};p3VlT=Iag`*;yV3Jy0}w1Z0-VS8_L zDcnmlCnZZ(j`Jqb-l#)Ecm~-7ov2E)Z9Y|boGPnaHADV{WMc1Ga)xDwcJ-=y=_2)k zdxqgVD66>D%Xy*fAd+tvpezeekg*f<_{jz;1ZfXa*7wCKPhO;Ep;7^>PF&UnJmg6A z#YN%VaVW{2_ERhMY{x=*Law|W6`sJjx4F4G1rcAE*jlbw9VOuWb|6#8qU+5U zq;hv&*xgRGCyuLVk68o2g1X+fWtn?n#Sy`VKw{1d1llVtYA~R#k@$Htbr^eB7M71+ z8&AG;LXl79R{8NOeROS_@#59IjIT+H_KduBj`Z@)xbm$_Z;L#TmE^&g%wXPJ$A+(o zyNZ0s`ty7A8^tA^j0~dO)C}F9gKs%{kGm}u>Y9-bG`-ZTuw6MBLyGnyAqEd8yV8~BC(pW^)`+Qu zg3f!ih1!Yp&3g4HX>Zp08K9^*JZ>yxa2-7uSt3O{xN`o*52G)tARD{Tnh%H=BSKvU-)0qWB@&_45cpXMnfcMBe{ZFQkpTK>)^#rD5BiCn0$k<)pc7K-9;~!~5pnqQ zW&n+7n4-d-u<4m^P4V|V;u73e^I_Gw>^dZ5@eeW%mw-gt2Pfm<$ROt*!*-biq=uo{ ziO;3~=Iv4*0zr)Ab7JB3KRC=#Ko0yXpOAwXOT1+s+_uS5VK;H#U2ZFFo^|>4yop|b zdBUW`eD;ErFI$pJ6RSc@$VLBhK&QMojOB(-Hr={CP^fQCFN}C+SmX4m)$}hF7VviW z$>tKDj8n8~_;#m17!G5RU)~+Ib0z?KLdjag>DU~*etCf*puYPH^#$&o>`#qu@2(6F zUp7y81c(86Z{9LO z$36#MncLMo>@By^ui5UAKXwh9Vbk*lnw0h0Slvl=E&0wOx(8P; zfi3eV$(Z4b17WhX5BvV<3*~n}9tB>y)#SFfYV+Xg#!_;Q9A>!G@(DKmECQV2a8IUu zA-GqkiALPB8MAt!Q|HlY@9(LY&Ryd7XmeDrN9bK4fJ-C2U~Oc z!3?4!!VtScy)pxzVy6{70r}!1lO^qmt?mbmrmmVm1|+u!3GU;3t-|6ivBNLJEMvcy z9TO>-cs&z0-oNorZcMxX#(H;oFw1{ARni|rK6maS_q$N$2SY%JJDwFMp)hh^f#}z{ zm3cD(ZN~|6_MX%7dbTU0M!HI_8jE`2eG^X2ya=pNRTp>HW)Uv}(55|6Jo*=f^rO1Q zZkJgNlmog%59lieRs*OL5kP(-ecwW_{g8~kZEF1`=GLlCDYRSm%uvfk9zC75Fc!o0 z)cVUfPI^}AM&|Jb49^R;ZB=z}R*~p5uw$1QDs9UxMR|Z{zObHjNvT629 z?9^HJ?v8bOu=TvU_#Lpqxw=b$7F;EH+1IC8)((J<+O_s^!1r;#$$cm*{;J1)t!@sr zF?OB=23TFwanHR(E6RMlY+wAd)ysb8XsacXDOu!PO16S<@E+k}e%&`oj%@G80IHSm zPlAyUCV=64Ty_jq;NRb+a@|*Nc=eOKx=Jp#pJJs2@x&mU@ijQS5NajYiz1-HU56)t za*c_T&OP;K*j87>il^7_zpiA}j5hYFB4rd4LEkUw%obw-yg+`wrs@_K`piO>e!YjQ zfL-yCHJ8Foaq)1utBh{*soaj5IVv9zTiO9Lmf49>EWvzWS;4q{$)QKzXy|eSvPt-< zLkt|+Sl*j;Zt&9xE(m`IIwDr>&d;r1$cjHYlnByp#Umon0P)QyNr!Byf;LC1Fz%dK zDa$BA13yjexGkKm#!Q!lSEH<`FGH3}J+fBrd%5lCB3w}ek2aCNv;a=s7b_x6${_On ztH_o@bap)>yeZ@BVkiW!yFRq{_hi{JTw87zA{^l*)_D zC~t2+yJ9!^BuMCUkQip3yK}f!1iSWpb?~rSCZunyOkGhENiU=!>vt_rQ*|O$%#)7$ z1(4ZvflS}F_(6ccfhQ&YZQd}1B!b~{u*jy&skm`xCchzsb+IL-p6WfEm-^~Fjn(&| zAo_blfdXUmn;nr{(fP+30ME*DShQNH9a<8q!-KLihA09RGKc7QRAr-*7kA6L)@rwc zj<=Fezm&hiyFPst8i+TB2v9!vF{M>a44O}?v znVXpq%7aElu>~lVD{wJDSuktmHdj6)=F7btC7iuPlfNsu#bF^d`GU&8T-r`(&5*t> zGJ1GIa!FMsB{YNot({T$kZdxtU-tx#2nQQi-qN|WfVRzJ!f=moH)|SBkJp}@v0k)G zru7a);l!M!|p!-ReEn=9c&ePV(}JI*+T3v9_(pLMnq-WBsqNQ_th_sA^!B86*3(;`8HfUy8#Sb7f zkMAQ!)yMe_rywb>!lH{<)1jwFC8U>bHXc7*6x|z{5>VVJ2go~?7hr{jQ+&Y_$?GZg zM0AArnKKjISF3hM9J^xW25ZHbhGYN>yWGxIR9ey{HVo;0UGXSzIpfNYR2gNlgN%Gp ztRSKwc?#Dtw2R;@G(Ny`ps2f*F6 zLw^VKV=E#D(#BL)L%IklS@D1~M5>CFm>7!;DMF!O(cPE^5k?AFV}Xj`<1TkQjT;wW zjl}a^vdnoFCIvdO$ATJnzJbuAGiVzJm)%w=%sZSP#|x-Y;l0!6+_dBz)}dMXiGSo> z+^bLWZJSd((H7MOS0HecW6aK$l_6!PG6RUa6dWnIced5o5P=jfKu)L*&98N(7MvfV zS#v{Km7$WiH`YJ8JYYhCS*%v^ScxEvWZsN=$t-ixbEiQ?e*(l0R)b)LGPOg6k~->}1q`h-~lAVILzWV$}F~O*~Pso|<1|XdAy88MMQT&S@2n&rI zM=AIpn=t;7MZlPF@cbkusePs+7TE);$h>pA)h zds{MrRrf#Qzz0lNqZEDXRQr;+pTxD{5_4rE?dHwjL!p8L28Yn+E9(WY`v$Njy(6Y# z&A4sl+g0)tr-xv1`sA1SnLm&^i++R$cC2$7f%dcvbW0^O40FX7zKiY4gAN2)@j zI=B8R$2sodsdL~7hg<;}=19)_^LSOXz!FhMFMoYKo@jZbsu$>jkAQ!zoH=NJM_GgA z*k-8oJ%@T$r8W?jZ5DbmfuIN+QkBOlj)hp8+aYz80ucBM0e#S4jkMB#|JV5TBnh#rUe{2d+?#s+;b}RN1O6M z(AAAPxZdE>(T6e|cH3RH1fdUW^TCvSNxU~ec>CnVD{|BJi1)Xie6D=beImtn*@98O z)Z!6P)DR#1F=ie+Xl>hmlLJ>n?^FrDFs=PNeg+F54r;GZ!7<5y{N zti8Nq`>ut-{R`GScsM+MG7HbFTlrYc=Hgn*>;rT1y*|ixix8JU?Se{pRsa>-(EApK zCArUixAZbMkc)j!-+=VvTi@9!|=!r#kqLW^+W*F9O^^xXVnaTRZ2PYsJR{8Sv znP~d@k;Gx`2loJW8-biSHykF~e1G@h1o$R~fDO|IJm@Ta^J&p*tJ1At!m`0giB2z2 zv}VEpS#GeN`bo1Rm5w5EF~7Gw&Xl842c&EI?!f(L4Xi3zUgY`}w#YJ85?EUJ%P7sj zCXkh1NSv{KTiwl}>L=PZ zF9bwvL4G@|Y^`v%hUXHL>WNq84jIKBh0Z3Ezc5ta0W-`rRt$H_B-^T%&3~RHW>smVs4W+@R>(Hy&e>r#zJo(y z6@hYDG8M)mJ9$`H^CEuQ(k}3(OrX$tRlrwLVC;49sqa*&^Bsft(bGLqw&y;TG#li2 zM1la|0ZV@4H3}7Dg5#SLzO-&K(2j{6el1Rl6xidp@T~rZyuqk^gOx zm(I zA1EfxDz0TX!&|@qc!2x*Af^@TPoyNi3K)y5Ydw&VS8*Q}-3p__;1a01UA=+xTH^k2 zS4|!2!Lv8ItJ{E~v;Y_ zR3t~!iF08pU_W>xI?nGTs*p+;#b2=JVW)h=7u z%${gk3x;o*bN5C+L=n21H*tgQx%adaRENx$>D)I0u2%EtCmqhXqOC0?Vtv_?$9&t8 zemwf+3YEr-(=ILa&ffu5MpJUft;x?ZCf8W6%W6GU@pHB6gH^fWqgK}^oBSWFOQ1ftHD0g9xnh^+t;?cxt~9FpLgl z1R4J$4!j`aJ6hv>cC&KRO(j*O6|kS-1L$`mzlVa_2Mi8wN{K9p;lMw|X5I9HIQNU* z#Mbh719VdCMu&2xz2F_;?>ZH>iAdlk0r)f0T6Gi4??ezdt&Uip9?d~lfLEsi`+|d~ zSph)Mdsqls^`089A-8e9JKY8lo&Ck8@)e0@opHK=Og5_8yH@9BQ! z=0l%TQHNN-Me75i+6KgRFE36;Kx>gW^(2c1UeL(V#cmcBYD>z@I zX_Co7TmLDb3kLmYjpqBkJ0Sz#0U04BWxOJ<|^ z?!TsPTY>A40sKM%$|`UJbSYYuaU+}lI}Tfs{P--{Rr{LkSv~i?)g5PuL+r`%p?-JT z{X*bzD1ZEk#5+lfakevlj$zaP{N=}QX!&+4rg-jnpLi^Fr~B(?&lYOtM*)1)27LZY zW`L`*K330Gf}rt4*BkM7!C(#p9AyQ9FF&L~ZLufw4PcMj#SWM6>?L~tgli<$qt$3E zYzA!Et5K(68?7{{fDzQ|84%nE!QfMe#1Q&g_}+hWu_`{2q1c~{ei53$r}M5+I_RpU zo6Bz8Sq_-XO4-0sk9(mNh+}tcM+AZG^}%-D=wGn9WE#ZPc22E=c_UyPMdG#tZqpI~ zoQ~VZ)3kQ6z&)tnl_XjS9QFNWR@&ZCAOop}eNLv>NL7fvjWz5#a}OoY-@spa0zCEw zCT(H7`zrS90H0RrhR?9+pO#woF89dum4Pass+FS^J8wsb;B9TJStwU)I@Ubt>G%OC z7NBSi^nq&uXDp$*dN&|;M#N6`pLjz^6j&AGrr|!3w22yT@(+;oYBuk+nW#k>$FcP) zU`A_^*Ix?>`wh6YNlrh}8h;0s9VSB*ik^+*>Zj zz|0u&;_&HLp}ESTAV3P&+^0UjU{>%Gc83?9wlI40cTQ!3tZ@j~N+;l=wU*ijPyIS$ z@3fHb)-C`*jf0m>FpFEh|BkQ4aCxC2_}RP@F6$r88kXdNDyDdhMGiGoI9E4RZkH<4uKwV2AOGV1RIoES zCSW%3OTKYKz148RPm_KQUkVGgK%!qn+d?qIA-#bD2^c1ut;N)06B_Q-cnu7cI_m&QGm_`s&&fP2QQwi5tLc=d$}I7|LBZvN6-fcrGuHORbgxByqp)f zE?po8R_&F=t?ST8VSrM5oq81os>FDWe9~uvg=R0^!_23b@Y~6k8~xlWP4GB1)KXuQ zgdse&)xM>gy-W7k3^+tBC6gejJ=k;G^3M4qF&xck9$J)4ILGW<8aL zgFQzr^K~TfAsgG-LUT$Ebj=fI_>5{@o~nOKsb>O->M;nNUqF6Bg*fM>Zdrb4`f{$5cIUfVs}R(^Zu(wGL$r6!uHOz5g%sEfnUXYv zIK~O7spriyIo(Amtoo>rS*1<93Cl>e{GXyJ_<`WKDz=z4i+uL>p8D&{i+579R5(M` zNt=KJ`3EM=<&9NFUc$-UR}}?uhPlnPaL+yi@nm#`RBvS1jQf)8HtLZd6f@5}P^I3y zG3MAM>g7nplkKS;LJNSs)DjCh*v4d7Q)ZLtKCFW>G-MgW;#8KI7Of#7RA1wOL=ao7 zk`x?3Af>?vhczUY;9Z3E@K$|LlzVG$h3PeoX)0qH`^7Hxl?=7b5*nSZh!o#>5uZ{CD{h>_5u`aQ;OPd|hzlWGcMOcKGUVaE^nAOM$KY z!nu0Z|1ZM&=P&`*nyP?bP0eB(*Hyjw3&T9Wx zF92*F_+Q@`9>+TXNMQ`xM&z8-B!j?;uvtwP^}CSJ7Y~434jjbY02n78fW|KrV{TrH z)IH~sbbPqSHsTLZG+triJ!doBQ@}zMNcxkN10I*V_W+BVv1yZk0}gKMq<7Hw=JV8- zMBPfU^;LT1)(b)P{}I7muLMd%Z5n-WPxe|(v8B17%Bu{NBOM;(dVH_2OIA!08MLiG zFuNk=;ZlD8b1);;+u?AC29>Yz5G$=HSIQYiLHov)wW_%!D_}+_jkm3C0|lc|rUy@_ zK4Xh;&EvAYjxoD{QLH=yrET2+^P7X;|MfQ1`1e$pSO*li&{5X~_n1d`oGXmC9>3TD zm19AIKL((xk=qr(L!YG(`@;z2M-`d!(Qyoz9j2lTy%$*eTkoH&Df?SDFgO3bIVJYr zno|k@jxYahPrL!z@$T>n*lK?k5!u$P z+&!F$Ha75z<~1CtP4?W4J^Tv&ow0^6fDC@&Y0~!&P$~~Hjl1r<)_W5ohWTGi6!5ls#u_{F8aR!_m@RS zuoWqE{1y`on{}?UUlW(`F0Z*yyZ>*PSWdpg-Z4~p+I^q4YDP0Ym8nhNsIv=OCV@>O6}!P<~2PO!DqkO6}yT>vF+qc*VWfDHRP(~gT*?l~2R9xj&v54W85A?A2* zyv7t1hX1DqmkCVPg5$P4WC)OZ`^jLEo^tQMfVJc01ymlc)+lDKb0$Jv)>A|QCPT@7 zi-TA#1NO7{9B*Y~x!t%eaEO!(jJY%ecsEmY`lq+WJ8n;${5#w>k{et0-Wm2EaqS$U zM{GmEfN5_*I~5o=jPVpblu<0NH4U5LVpk{ij|PnX# z{{{h~P?ZV|GK3MD^SM!2mrEOPxwQUrx#a&lmrLJ`U6756DQAy0!ZE7U9BLmSxz$IT9ZzYmUM9usKJ28CHi9`l_C>jZPh1;Y268C$+4Jj z_CC`(!1)9x)Q&z4k-vHPo%|?C2vZ(JCEEtlp=O_&=a~u8!kD}x&8Dog7U;Mf0$yQD z3tcpO!qd-}yLIontUtU5oK%KM7_S`NFyZ``y!&nC>)vehxJH15e}ZM(^Y@_$OF5@UMJ+& zc?r*{v&xRkTNHN436FC=UNSzt9cYo`P5>+$CqPE{H8Yb4zy>?ZkOopvwY{5OpmYI4 zj)C)LF67-(?Y$0Rd9CS~>Tgx|m{cE|lh=U6)X=NSVaB1XC@)j>4hJErt0Ej)_)t#q z{J(+qbu0NoLjYjfU|SzBe&bLkC+u!s%d3ioC)8%ZWujqP!#C{q7 zORCjvIy)ouOADzFI)vhS(g*L)P;B&P4;k-psWN_mo@@mFOd^>id~j~yJrBmRz~a8P za-2Io_!}9r;R@)ZP?r5g92BL;Z=j#G{SUr=^gKkZA6o*%^IW*WU6xsMX1hYX))c)g zM?Z^3$T52^l)PL*BYZxJyw{`z;$n^R!RNr+S z&?u7Dq8~yk_5^3T4V1Zjcw3( z5kjZjkb{q9HnC;4S{1&$g0&isB2L6{eC@rz+J8*i8_SWwRuq3r4rjWLZMwe_#})=EKxlY=)oPErFXmjOq^eB`mlmO57Q zQJ8p@(D`))ke4zrfpi@KF3q+4Q$Mf9JWwzipf&m5xnC(L%u-c5Y zpr6JLR5xDUg39^@KFjGmodHKXFLW)XhYMXTbI2X=XlK5>sk+b_2{01ALAEt1`pV!7 z_OW*Up-5q&yo=LARz-mV`k$2sEQ~NLNK$77tWABBGo{`+;8|!bDtYE7)us5Z*lnHE zw^94sIDuatkH za2O0I4*!}5rnI)(X=gQi75^!gFj3(6f@W7FjZf`~euJu?$wkN+JpK=I&%;0R{%I|u z1_?w;f@}~zKqbG^FJFEvOZ-3LK;{s<8s{~#>8CS0DH5q6D}yC$Mfs0he-8ya4j3Ff zn0YwXO8JpW`TGlEDz(_M#<@YzfvCw-CMvlXW;I%k>`#?EI}FI-rj@W?^%@|Dz{0ah zXQl7V1lFK0P`ODPrPW|@X0HWkvY=UHH0o{WG-%qNug!<;0aJwj|0xVe9bkF-L*OQy z(*xf5m8^JMV3f!*)fF4t{Ws#k(a$zOSZcl5*av5N9r|GT(M#h0J=2VrXe(Fl#Yey@ zU#~TN8{|?Iz+|omYW{6Ly{0)42@v}dFVH1AO%6IjT;wxawrkNhJ2^V&Pk%sOq0|Ya z44$RF+;Cuv@9zxbJ_X56iNLr=FX&cd7FPkp7A(20Uj~oV7aQCKW||+f$N#ODW`!Z) z-xB^HC)tVr&G=gb`Q^mddaQO3C@to{?d|+^pgVyU=!|6-=FayY8r-8y>s$H8Zo5^k z4VGj>EWS00PP=dYGSZCgweHO@m6RgcDY=xzi;=RcG}RNjcp3r!3&gAvOy@&R3(B z>Y@>ey-2s_m!zvhr=PFbndBO3`F3LaI(2>p$nqmtqyfH7HweOr{~K$&39W&>?_cKj zYH@6{1ml%I+5Z#%wdS{%?-+!$uDRFlVcGwJgze`lIB9etKoY4kEmJP4$H2cRCOEWn z5bsR^zs=J!daT*LgX@x7I0uAAT`sC9pp}u%kqPSWylCl>tzXdL6E0>6xLMII)c$R^wI+)p1|NHF^Or&?*L%Rj4QJISPt7 zPjx_hG(-Czon7F*O=#J}JSo*dz@x#9>{kXY!-7P2b+k#cg1R{ai`27&^lJh7&k~~m zzoYD*?Cs6!SbICwjmTv-Ic$T8T+6IvD%=23YS{OD=Rf5Xt&X_Ja9qC zJFV+KO9XU^kLHp2!LPpnEr%kJX0K)z-Q4(>C1>OSYp5akLF@t06pCUYJy^`aPl7~- zw`w}frNlfQ^>79}XlKv8K7yT$aHV391h5QyD1e1|3TW|7=sB@#Ad7?jZ8&rN{033X4n=F*k@yjhI zP$gk*ITnSuW`@0?>jeBRFX07sR=IJg#p*CPCvc!}dAI@uwgQ^WIVU5PnSM4BgTsCd zhdhS`{Zj@|-Q!v)^~?0mc};zs7^3=Gi9~PvHN?6wZwFLfNJsyZ2`6K>ha{`-s^Xs> zobloMxO7pVN9+Q>^geE4bo8QP0r+zE0MO9}PzvNoutD^un(Uu$ z9D!drUxLm~7ya@h-OU>gC)ljSFj=!P{lW=Y9U(dZoxCTnBFeatk^&Rp*m?f#H@L@O zmRXQE+P_)+k@$oaL-O9mUVHYim$RDc?12$rt76jbNcp;E)FY!QbZkRmrwgf3^LK?L z_Q)7x3c8K?vFI0%2KmGLAMl<=DB@j3q08EkGokBy>4n!7=X?$x6@;{drL@ zYn-`<2sB6VrZAl2_wh$9j6wRW+VfP4`5&jeyyp#C4H=x)g3PI3KLMlyudV^s@)Em( z1XDI%p4&hUJ4W#hXyjJO*sq=&H*i*J33LaFF$8UKcrU3Khq}lz-vgi*${jP1aktGa z5#&oK*9hCNF?LO$s8Nf|l&<+jY4QZyex@7KmOP*T3&V>>dg$hsWP2Sl-)@tp7gm>D ze2WFy@tE=uY*ZT|vWVGSDry)D^g#YZ>{N!mbs4_*HLMD+wP?c9ShMI}NXhz%U_@^~ zg0a-gD8J!c8AG6#j4{Q7CMb~!=)@H?u{CxDG7OLPftis2M>~M!m$|gmpk(Otpv}ZJ zt@8LRJxWmxD}uuR?~e8mnZ|rSO(9 z>nZW-1rw6!gGKMuJPdHMo9VJ*-u#WuA3I40CqenfcjT9b=4!K~KKS=1azh1%wX>0b zdf6$~2{hyjCzyf5KX!sSiLCcRYb=i9JBQg%Bf#4E5Fm1tdil%69WU6;?$;WLMM}YW zgjKskR#csFp{{hD@(mfN$sq8{fa~|7igB&fht||Fh9ya+UabGx(mY{Gkqh^KwfE)G zRKEY(na6B1$=t?P=FFs>l37S>GDjIRMU*j9A!N=_=7`v+P(&m`C__T#IkOTa&gW_P zefyr@d*1h~_g(K=zjM~WehtSEmnsQ-a9 z=Mw=%%Z*8~-*qO!gONdnbxBrp;lLk6HvvL)|Cjt=5bvQnv)eS^Uj3=w55(>vrVMQm z+|&G==a(wy2hj}D9MEea^58#O0+9I!DnWOCIhucz#ZPr&>eS|(@W$u$;lW_XU!y`GI+-1ul+A|J10P_TuDGgx;b(n z*?y?1?DOK_J0vA?=HA*5uPMY>J>8m4)0XL;mHcP@dK_lAK8WaI08B@wMaC8RA1-xk zOFIup25gDhm>`Bk3uG0N!aSjxa zAosCU>~F#y!Q__L|E^RYtl!>Rf1nz}c`+lk5>_+t%nBf>rfGb}JJ9I806cy^62B_x z0(n6Xpk4CVq4!`VG%ZEUo>ci91;vg_vNu8w(r5;O+7Jh3%|!^ElNI_BlcCIAhom~5 zOU^)0NqQ zdnVIJh}-Kgz(?M8aPrJBB&bIbP{!K7$y@!OZsZ;Ap;J|H^vR`}Fd{y)>tExkM%}@2bx=bH# zF5$F}dKDKEs#3IwtMjitYx+Dk)v2&BMZesa#P-WbKwX;AE+W$D9WRhef*1d}mKO>d zy!iCB!PRXj{RWStj!aT&GoHZsft#Zwb{g@RX{!fpO|f)8+JxI!nn~zKf-KPADMl0b zKbE9EzasU_r1{fa_aa2_^)?HszdKth+5#hc0Xj)7ah)v05_*|x{G5`_QHVUi{`=p3 ztefD#sOm+8_lX6MAW`+h3r1JP&C%czQY{Mi^}<8@0OA2JmI2@Y(eNkV&m-UF z7n4}(bLQh391dd2GMGK&AR}-OA>#R?q8;{`;V@Fm9*70EnFv)BWt>5d8Sv~fW{-vd z(%dxzf7sGfhYSedQ|W#rETA5$_e80XTjN+4xNGjjH{E6l!cyuI(YJKB9-9jPMgBYg zsQ!WcXL@?kno`8D#B|Op4`LBZ?wkAZ{q(m&y0O4s;Uc!s!yU2*zv#qlD?4e)EEHmQalHebo2?sYA?JqwMU1xiPQr;BFY z6+$g6(4r{=^^6?#K+6F^^gaY@=jsAoJ&Vct8;nu#i z1-_R^bIH7Aq#;n)lIEmA^)kgCOyMFa`W}*eK$qJb>|H8|(J1$INpy(b01??BB4#1G zWPWFuVd_xqTTQ;?A5!=`fnTi(0x<&W@5Fi~9ea*3?G)f=bQy;yZMafcf-=EoO8mLq z-E`|D+01++|8Co!k+W#T*iO)~fnaeNfWGgX3 zWIjdRPcANjDH~sOORrnFwk9hie#&wGFNC14gxGgu>k%g2A*t8EkjQ-2B+t<;JC}{& zrc>VY6Oz__&?HzcSvEOIA`-CK$ff0{aVg`Bz&aSj4_(}Q<0RUE0q1h*!HGdWRFD>J zCMlYsL!x;zjjBNO+Q}1dTufjxX#S2L27PNvUM*gu{St$LSZsk{;7PByV$Z%>J+H|Z zxa<4&lRTg-vvthgqK`NA^E?lodVad*8Ox2wr6YWLfq7$oQkU`^#0(m9Iu=1*Wt&Gv z@O?woChj#vhAs;+OgZjLJPC22O-nC1t7()FG2$&kHuecnqI$fu@2#3oEnpjLF_~}K z`n+}L>XO?#$(!x>K2F}<`6OU;JqH4z7}(@ZNibi<&+H}79$5C4;=TW22l`+0NpF_L z=ah~qbH|fVh7KN{u9b1%r*dB)KS7Rwh}ZuFBGS@0?@RIfeR2tUVTMWGH?NSj-!(hU z^qqGYjww36?ptI_@Go5$|9O5B!+ zIl2B7@mz%|7@l=Po{3m3tluY)@Xn=PyuYHXUGQm84Ibi_000ZyG8ZDrbA>W#-a9C0 z{d?;yCcjsKAn@<(rgSOtg&%!Rm#UmXCX*#0X{&gWRV5*1N$rgY323()o zAT_}&VyG$4AyLZZ??KELbQ}YygIwI0i7;Hl@{gv#SzE}DD27OnosZ>hA{AhBML6s4 z+928(CZ%Q_4Tv$#2Pse=nEN4Hqx$aM)^N}my`Ov)zX*Q9$B^iM;jGQ}<|?>S4^>(~ zfnEVRCEMEqdOKho9NK|Ntonyl3Su;zU^juCHKe9QoB`3tw?Ieq7*VwU78cFqZ&Ce7 z?QaPMb-73vjyOmW*2YP?*d{vJ9ztZ}2)Y9&anJ->3@2;v`*pwqq@k4#H#fpw26O|l@R_jNQ6=xbsk3IUw?EDw(Wg? z-Oqn@JI1Uqc)K^eX^{1M)%E}2#8v&jb>hyBJp*hoku%Jlz}nxe(~ zMJ45tB2QX`OW9~h)Q8s`ym}%0=tsWx+8L_#-9ZJE-Mxry0#wBHS|fvn2+u|G>-|@a zo7{`x@3yk6Z9T^4z1E=vsOF3guQ_} z{95v~P$3#cNJ5Ni0dK~>PQDj&&@&dk5=z31%XB)L1hFx*iAL~hd9w446{kB zXhE^diuFw)$=W$A;jV{lN^3VLL0Ip%-Og1b1h+j6&;)H9XMw^c9AiinYspw3qfo;V z2GCmW$j@d5XWu&463g?Z>Q+q~YmD@OT~3NX!;Ev~2kDNH^w#un7U^2te)1T|Y#)xT z&uxB7?<+1C(Ca#o*YQRI`g|p8~9PeK}nY-v#(Yjt5BS0EC27b=;09k+jZK^&D zCrWVz`$;-a11t$6rAa9-q7VCQeZ9T$H17_6JEh;RFs_{WTNA?2nwGCkk}Ny3jD^2NYVAhqbpWG6lwh@lr=lUNUd0;i9I`vxPWua*$gZSBRg>$SD?%+^=yA zG7%b`Dqi;$f7d|83JvF|#thj*blF}i;fK21No=nU-s=FU%ejtK8|rk4lqkNAaZ>l@ zP645rhdEz#yh~$=Rh0TIm@2wV2)9~^ooP4JSp}8Hx4?PNdfe%~A1X#VZu)XfEg8Ej zww+=D6sSV$a2M&`38Yl>QA*OtR3X--BqwR0d`Ywlbtqi`LvW{hOCq;18|pc_1J3^{HL+_>1plHq&C$**3F4=sm*qxDy zc^#r>PR3AYovnFm*bpKUds_LeqWS)~DRqOHW84&eGd!97B4&lr%k;5<#N|I$_0`MQ zQ*z#aM=g_?JBF+jLmVBMVhih$lw{iJ%Zx0KRW8tsH|upP8n#DroLYkmprv5FHg%Mk z!XRi*HTi;XoPXuDjC5GTdtaCwg$S@PphS)yBoeaA3(3e>iCbyB=FCK23I9?|F3ypHZ#A(q$f86LcIb6_QfJ(JkvWZoti@TS$r9CIAB>fBDKH!i+eykFK+!n$mW&ogd^33>34`(ZvZkO%kKvu?qe z>;P6-TH&1FYl@b9Ifj_u?+3xHdt`|38;0QbSJ2!+q&${1YQaNl;XHr7dWAcaP#<;q zm6FEKW20eiQp-F>UJxkRa`O@^^56Y<3EnLrBRt6c*)@3~=N~V@yKEuwpZ#kdHbeC$ z`x(^thcSO%MJ$$xcJ|Y^6ZU~_vc!K44|3Hs;S=&b+j2>%cZAEFL%X$&2U%j)Z;#0{>P(Vvt;068d{+ zdpHT0Vh5fIY^V{A0rQCh{jA39NL@f~*(?6Ekh~<5gI`R}-IQtoy)OqW(2?v9AS^(N zdcU;Lmm|CBu(_5PB5EH4?z$tfap@yxfK|VCJ5j`_S&f$3)|lS6&y>!(;@e^P#E5$H6UD> zqp$uFuOlz$3)=>0H= zw5zfKU@vV%c9MYDoF!9lTFM4kKDc0uuyF!3UZ<;9UnY0^ldriiS#fET5^}S zmMhvNG&g}d+SjZcKuJRSX%%*SaZCZDYR=lRi!Y%l zbMjW097(5(#FB0p0db7Tx-4lnBu)l4jv?hINc26cpskT}Y!VFNr~|6eQMTJW+X;`Z zNmCtpQLW)0izP#6622vHd2;-s0)Zl3IENRmo^ls{+Hly`ML7M+e4#jc7W5ubR6_us z*dTwFmLGhlV%9INS7GxGA^Fz-X>4KprUu&Ik>*xGB+Qt>7`|jvi7N_-@6}tWm6l32 z%UaD)PUAiF2F{u7K%}hF6*ux`jcYR3bt`ti)$LMYPKTDm-i9I2v5~yDz?AguN4d@G z3hg|y=_t6#F$eb^xw_FVNROfPPFQV-W|KO|?)HMt3dZ?{2mMSPG$7twuK>4T(-M=*D*4cMm?lDmzNFd;PBn1 zu^@cFANc4FiF4S#^Vg~N0t%^40u<6eji3zoF0yRJ-DLrLK}O@h+-WAHlxPeuTyQ(8{bet<}XG#i>*)yM~F{c8RtZ`L}oVBxEn7 z+UHZ;1s%VE>swFJrwiqFD_QKv8F}z$oyt_WEnMJ1I@Rl&j15O;lI0#B_98ReO9)#c zxf&?hMK+P3$jymNzX<7LSa5G+Z%Pkfp3J{W)DbDTQNkZwWZE1u%H&5nUqniWRa^HhaNCCN6YX!$E2OP6X2&P4vE3a zzOrh~6k}h%)a`OWA10kf>gwH#Pv*uO?s5(H6~}?iF^J~W9+WVT5vzHeSmkIIHlQ&`YiP`mV?lND&XqV8@TPeUs5D&TOepY0#MFg@yS zRZ$)P9SRQtxYFk?_{OwL#58h~v759XN-gR(#fHJ?WtVY}EJV|dJ#yb4&?N%WkHNAP z2GzEB5=00ugx-5Rt6wnUeJKK!Gb@;+=q(SX zx#>+;trprfklvoKA&)o(rb4|Ys+-!D*j+zST zn&sXQqwN0r(&B)W$v3=YcA!n7$M{be(@%G(PPj(k6GIar35k`ss`!|K+mLsLw;lJX zkS7r^TLa61Y-nNEuO z8vQjy!Stlez<}6kSeEBoeCG%J8McR$Ldbsd*b3GcLU)LZ&3$#06~&o-6^YRzN%Wlg zuj~uL@KU0vb=)eFrZ8*Ey>*1bN+)qgT1`F~e~5mXB7>JEwbSWcn;W`_T_qH~^AAKr z9?zW+lyQE|Vj<~K1LdI?=O6{mwuaR2LXhZVO{3ybE~M;Ypuo$KW<9_Vc^@bDMA9}I zfgGdVTEI8b5QGLlcz*lddvx&Dt&Fc46zj5_CSJ|fOXEWi6_{sYU+jQ(uGw9&6Qej8 zkEhD!4T{#mqSRD(Sw=UDl=mddKb2ZdV_xWgp`!!ACaqI}u5_HKu)u#WppHGHuYq5x zxG8!g!Z{J9k3%b|0oDx%#z>rs(!2_@om+M9sI?axY?{$C3O)W}YtN=uLCa<8gv}-t>*b(%$3n(RNmmaOF@(T%l10>`$)v_+O@6Hy|!^Gt0LjrtdH8M&U;_%li-$2&dHU2YVkWg_{$ z*pMxJUMm^Ty%4Kg3d$Y(>l{zq83u*q5V45!eP79A<17@!$MM^$+(GJC%YM9-Tab0p zaFWo)1+S*k+zAq%Xq>%9AnQec^^C>nAAo4_SfK78XkT8`NP5Yjt-iwod*9CS)5dJ& zy9BH-tr&3;^U^iHSS$r0+9tWYCs8)9kv&N}yt; zWRLP+^spJAt@&Vi++Ft1t0*{12z*Fsola$TbH-V>=zD)&IF|4XznH^h!NE#g+H23R ze%U?e-z_BmdAaB=JK}Z%`-cki-oHU4f+uwwVH$PVobH$UYitQ9B-x;bKKV2!TJ^{8 z^6LvyT2PgSxkQ`A{{=*2u{m_Q|DuP@oMwzIt=oAL=Jx(?SErUKOzfaRPz}X+BSbDV zg4}u_N!+L8Z^NaVLoEMYNpIk(%Lr_~yZNqehY4cvADJ2<3^E=nukgbB)PtAXQNT=m zra;VtS)g?4b*YLF?mNy^Bx{-$_j8maC4;=a1p0z9dJHy3w9EFzvfEBVq&0QIA^57t zkNUm}g0@px3|uF(!Z|5H7U4^BVY_r5*j^T(Ytth-9TzE}Oujmr3TuvT8nHi6kzULF zNZF5+>0ZmKn)13-H70LQeN-Fmt(w2CMA9(~P3 z-y<+_-Ttr(;^PMJCrEtEqr`ASI-l)VlU0Qo6sMvryH~$ewTDm3QKXOrh;&E6aG~+; zgFwb|+0j6v1{#G0+LzHBXNwg$;Z_*9Q4gvw7G!HoJk(wx+S2dQrf`ajJ@K?D;e%%h ztS&CRE4A2!(@k^R+=5MfpL;@ONPVanubh5$1Q62s&x+hZ(b!Cnt9$7+Nf?Zha#xB$ z8!xMK(fG4)GWII1<=7(Rh}K9BD=<$Y;`eq)s9EhVuINa8|4!3$V9y>}SM?Le3?9IF z$oRtGi~;ItD?WRZ5(s<4(dYiKBU|)qK%)L*Kon6UOZDjP*{d2nkqCbLwDdv2)UAnW zEHjwKp|ixtnL=$8MF@fUX7viyAk_;8ge*}q2m6w#ra=B;(fwK!nvUE2{;M`vSDdM!0xw^A+d`!t1rdeCtGF-o<}4P zt>fPGr;kA}>h+{r1k_yCDMKMuuG220c%c;86X+Ar(pQ0B+v|Sj>z8G&JJ45&;9O~r z;o=hG@4BEgL)E9GcoaclRqGuJX~x9+2BDL}Bm#P|6$lcY(h+XDwv!c@u-~YJ?kLPM z0rCXXc<3pHLO}UpEd)f~lA3G@Mre>%T0Wn$|oxDtJEH%D-L4GWW4q z)aY0S0=cq)0+1}O+xhzD`twFP(!B|EA?vUed;uQ8kfN@XpM|7Uo8lw3dG66PcJoFG z>E}wmp#1`dTaA+C{bS*(}83&{Y?~8K}c^g772b#jYh=$Sh zP#E>TIL8^7aUyukVQ^d|h05|1&eh*YjF!19epD@!iFkmI*b!PWH+R)3^j-yU3-$&S zH!IfQ4=3>kT|o!R>@oTVKMNSYZMktQGtha$#psp*AQ1}V zDZ>IsGG3;$!!`76`=}fQ8cbLXoFB8+6HWlkz~ZNdDK=VC*gv~kU*AX?IS3UFd#UOy ziQn^_VEq^45xve29`!}6j)mMqVvB>ClD`0M64^+5Y^fWiGEWw%V5otV>j8wtuW|3h zCGk9@9t^0a{Vp|b6&-YOJ(VsaE071)EVm5)U~+GQl@$q5YhNGOw{>Eg*?8SnB4I_V z-t;sE!|YVz271O2xQC^g4~tqS@O)zAsrv-8a7U=u;c#(gcjH)h55`sl@#hSuTfZIi zoAw_tL)etVv@cm|#+rz2qqh~mUM~+>$ygDd-hhqZB@Rj!AHe6C&0F=Gz=V75yI_6_ zFI)Bb6sxe!TK3?+C>iN_E{cb7_2@5wx&9J#3t*fQr;*?|w4sxZ{%oKW@n#iQOUpeu z2d&PV%vC)X)=rYEznIX|TG^y4X=zmGQ+#~?{ITabx>`6pX!%XgM$hnMkW{6tJ+DrP z|M-S^Zm8Nr=f-WP@>kjV2``l~EOO$n%99%o>r3n)w9|2D3|7G&)aNW#vJ*`DPanOF zGQiUB9pUSGM(k~O$}oVMw*44FD?WV4@^Rm*_x3c{Rawwv=FB9E9hj!BApmWt1$B(W zD_?cq%UN9=usE{qmGicxi{kUvd3E_I@N46duKYFR_6+RP6}i5yI(Z(9FApRfn*OYR zO^;#%RlG7gek8lDDMTt*X^%Y&dsRNP6@+P$3 zt(Xx$;Oi2hB!6l`7Z5E~k?#I{Qd#aZKKIFfLr0g2nMX1Nq}ZL zbcsrC61Bjw?_;<&72m%IySRpcEJL-;MqKm!&X(_VkbqeZz8wh7`dq%iUiL-;IZF%! zZpNpj`JjbXzgY~erdRcGI-Z*AEUo0#Tp>Zc4{nQgfA)TCS;EsU^_nT9lAbfdD5-|l zJj85F7$Jqb3$=$2W#j<0kT08LbIJZ~KbOVI`OFY29Iw!4piiUCZE6NX2@ljX+Q-wj zu5RPQXnDgbt6Za1Zm5l&3thAoFSt|_x(NJKB7MpvC3zpIa;dk~8x=2*u%0(0Dkcw! z=SJTd*Plwad2pR%=N2}Dw!S=EGh=fEEi{a)O2S#+xK!{;0X2(gS);x;oeq>C3MH9> z+|6dJdh>X5cG#J{wxOG|*4Gkm)x;guWOcF9N26Hni*bZytn@@W?tE_Tki2t~B6;fT zi$*FxBdGs31h6fP3foi5YpM?kwrx^D)TF>dj#^oMuAFp6{BQ-Uf&6e!$ue})g49a_Et~jWrme(%QsCYm zCKJB}v%XO#yC&sG_j2H9M6S||UWr=fFgYG0S~=uC22MNb7vJemHu0+I^KdGKe5k-* z$5xKNw>>P79whY?KcYxUFk+CnG|gIauT6*QvAIkeZj6BU5pQpvr1Te_Dukak0$*{329v~8nK5(t4-)3_Qdpf9 z!r)qP1}o=BNuaCFF|kWTuTVyv)Jib6`(|WkqjHnT2pyh1gxAy(&q9SLmXV*IGGkOY z`jIt@83Gxp7h09a4`?utSphE*aZiGou?RLy5jkiYLX(Lt3*-$I($y-6A>ehq6|;8} zP|#*K#Y(J}EFBMSdzU|o>;RGaTplq=h(}5=?b<+!Xn$;2r<&$;8?FolBOGYI3I^t< z>SHR?KGyXoP!y!7D0XkNYD4i?sX1$%>ccVm&UsJ7c;96Ba0F92?m0_1b2V1qOtF9A zlM1=Wt8aKKa~pQ@Fq4K>R@6HcV=GOMrDKAE&6#nlNe&|u^aUZ-;P1v6eHMZn>^hj8xo1VPH@0WhO%N+?! z0y9q=q9yr#irhi>0T&mtY3%<61Ao0y4joJzx^lQ$1bzSdLIDD|?`J*a`RnU&q<@LS zUg8AhmLbRAuZ@7hAQXFVL_p(=Njcr$&KXO%kC^&->mHl3e_Y%Tmf;96FumvOHUGTo zSS%@AHZ_*{cfa!IX!HWHqkkmnG5U|eB%rXN)g^fKKZpIFLHL)r6M}Zhik&SVkr0nE Q?SVh)Dmo`hlr69Q8zsD(K>z>% literal 0 HcmV?d00001 diff --git a/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java b/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java index 0da239ca4..88c58e053 100644 --- a/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java +++ b/java/api/src/main/java/io/ray/api/call/BaseTaskCaller.java @@ -12,9 +12,20 @@ public class BaseTaskCaller> { private CallOptions.Builder builder = new CallOptions.Builder(); /** - * Set a custom resource requirement for resource {@code name}. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set a name for this task. + * + * @param name task name + * @return self + * @see CallOptions.Builder#setName(java.lang.String) + */ + public T setName(String name) { + builder.setName(name); + return self(); + } + + /** + * Set a custom resource requirement for resource {@code name}. This method can be called multiple + * times. If the same resource is set multiple times, the latest quantity will be used. * * @param name resource name * @param value resource capacity @@ -27,9 +38,8 @@ public class BaseTaskCaller> { } /** - * Set custom requirements for multiple resources. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set custom requirements for multiple resources. This method can be called multiple times. If + * the same resource is set multiple times, the latest quantity will be used. * * @param resources requirements for multiple resources. * @return self @@ -48,5 +58,4 @@ public class BaseTaskCaller> { protected CallOptions buildOptions() { return builder.build(); } - } diff --git a/java/api/src/main/java/io/ray/api/options/CallOptions.java b/java/api/src/main/java/io/ray/api/options/CallOptions.java index 7f086a267..37e474d55 100644 --- a/java/api/src/main/java/io/ray/api/options/CallOptions.java +++ b/java/api/src/main/java/io/ray/api/options/CallOptions.java @@ -3,26 +3,36 @@ package io.ray.api.options; import java.util.HashMap; import java.util.Map; -/** - * The options for RayCall. - */ +/** The options for RayCall. */ public class CallOptions extends BaseTaskOptions { - private CallOptions(Map resources) { + public final String name; + + private CallOptions(String name, Map resources) { super(resources); + this.name = name; } - /** - * This inner class for building CallOptions. - */ + /** This inner class for building CallOptions. */ public static class Builder { + private String name; private Map resources = new HashMap<>(); /** - * Set a custom resource requirement for resource {@code name}. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set a name for this task. + * + * @param name task name + * @return self + */ + public Builder setName(String name) { + this.name = name; + return this; + } + + /** + * Set a custom resource requirement for resource {@code name}. This method can be called + * multiple times. If the same resource is set multiple times, the latest quantity will be used. * * @param name resource name * @param value resource capacity @@ -34,9 +44,8 @@ public class CallOptions extends BaseTaskOptions { } /** - * Set custom requirements for multiple resources. - * This method can be called multiple times. If the same resource is set multiple times, - * the latest quantity will be used. + * Set custom requirements for multiple resources. This method can be called multiple times. If + * the same resource is set multiple times, the latest quantity will be used. * * @param resources requirements for multiple resources. * @return self @@ -47,7 +56,7 @@ public class CallOptions extends BaseTaskOptions { } public CallOptions build() { - return new CallOptions(resources); + return new CallOptions(name, resources); } } } diff --git a/java/test/src/main/java/io/ray/test/TaskNameTest.java b/java/test/src/main/java/io/ray/test/TaskNameTest.java new file mode 100644 index 000000000..502bb80d3 --- /dev/null +++ b/java/test/src/main/java/io/ray/test/TaskNameTest.java @@ -0,0 +1,19 @@ +package io.ray.test; + +import io.ray.api.Ray; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** Task Name Test. */ +public class TaskNameTest extends BaseTest { + + private static int testFoo() { + return 0; + } + + /** Test setting task name at task submission time. */ + @Test + public void testSetName() { + Assert.assertEquals(0, (int) Ray.task(TaskNameTest::testFoo).setName("foo").remote().get()); + } +} diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b5c4722d2..110e6f881 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -343,6 +343,7 @@ def switch_worker_log_if_needed(worker, next_job_id): cdef execute_task( CTaskType task_type, + const c_string name, const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, @@ -386,16 +387,18 @@ cdef execute_task( extra_data = (b'{"name": ' + function_name.encode("ascii") + b' "task_id": ' + task_id.hex().encode("ascii") + b'}') + task_name = name.decode("utf-8") + title = f"ray::{task_name}" + if task_type == TASK_TYPE_NORMAL_TASK: - title = "ray::{}()".format(function_name) next_title = "ray::IDLE" function_executor = execution_info.function else: actor = worker.actors[core_worker.get_actor_id()] class_name = actor.__class__.__name__ - title = "ray::{}.{}()".format(class_name, function_name) - next_title = "ray::{}".format(class_name) - worker_name = "ray_{}_{}".format(class_name, os.getpid()) + next_title = f"ray::{class_name}" + pid = os.getpid() + worker_name = f"ray_{class_name}_{pid}" if c_resources.find(b"memory") != c_resources.end(): worker.memory_monitor.set_heap_limit( worker_name, @@ -470,8 +473,7 @@ cdef execute_task( if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): actor = worker.actors[core_worker.get_actor_id()] class_name = actor.__class__.__name__ - actor_title = "{}({}, {})".format( - class_name, repr(args), repr(kwargs)) + actor_title = f"{class_name}({args!r}, {kwargs!r})" core_worker.set_actor_title(actor_title.encode("utf-8")) # Execute the task. with core_worker.profile_event(b"task:execute"): @@ -535,6 +537,7 @@ cdef execute_task( cdef CRayStatus task_execution_handler( CTaskType task_type, + const c_string task_name, const CRayFunction &ray_function, const unordered_map[c_string, double] &c_resources, const c_vector[shared_ptr[CRayObject]] &c_args, @@ -547,8 +550,9 @@ cdef CRayStatus task_execution_handler( try: # The call to execute_task should never raise an exception. If # it does, that indicates that there was an internal error. - execute_task(task_type, ray_function, c_resources, c_args, - c_arg_reference_ids, c_return_ids, returns) + execute_task(task_type, task_name, ray_function, c_resources, + c_args, c_arg_reference_ids, c_return_ids, + returns) except Exception: traceback_str = traceback.format_exc() + ( "An unexpected internal error occurred while the worker " @@ -985,6 +989,7 @@ cdef class CoreWorker: Language language, FunctionDescriptor function_descriptor, args, + c_string name, int num_returns, resources, int max_retries, @@ -1002,7 +1007,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) task_options = CTaskOptions( - num_returns, c_resources) + name, num_returns, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, language, args, &args_vector) @@ -1112,6 +1117,7 @@ cdef class CoreWorker: ActorID actor_id, FunctionDescriptor function_descriptor, args, + c_string name, int num_returns, double num_method_cpus): @@ -1126,7 +1132,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): if num_method_cpus > 0: c_resources[b"CPU"] = num_method_cpus - task_options = CTaskOptions(num_returns, c_resources) + task_options = CTaskOptions(name, num_returns, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, language, args, &args_vector) diff --git a/python/ray/actor.py b/python/ray/actor.py index 7a60ec1ec..bc6959d7d 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -100,7 +100,27 @@ class ActorMethod: def remote(self, *args, **kwargs): return self._remote(args, kwargs) - def _remote(self, args=None, kwargs=None, num_returns=None): + def options(self, **options): + """Convenience method for executing an actor method call with options. + + Same arguments as func._remote(), but returns a wrapped function + that a non-underscore .remote() can be called on. + + Examples: + # The following two calls are equivalent. + >>> actor.my_method._remote(args=[x, y], name="foo", num_returns=2) + >>> actor.my_method.options(name="foo", num_returns=2).remote(x, y) + """ + + func_cls = self + + class FuncWrapper: + def remote(self, *args, **kwargs): + return func_cls._remote(args=args, kwargs=kwargs, **options) + + return FuncWrapper() + + def _remote(self, args=None, kwargs=None, name="", num_returns=None): if num_returns is None: num_returns = self._num_returns @@ -112,6 +132,7 @@ class ActorMethod: self._method_name, args=args, kwargs=kwargs, + name=name, num_returns=num_returns) # Apply the decorator if there is one. @@ -317,8 +338,10 @@ class ActorClass: max_task_retries, num_cpus, num_gpus, memory, object_store_memory, resources): for attribute in [ - "remote", "_remote", "_ray_from_modified_class", - "_ray_from_function_descriptor" + "remote", + "_remote", + "_ray_from_modified_class", + "_ray_from_function_descriptor", ]: if hasattr(modified_class, attribute): logger.warning("Creating an actor from class " @@ -679,6 +702,7 @@ class ActorHandle: method_name, args=None, kwargs=None, + name="", num_returns=None): """Method execution stub for an actor handle. @@ -691,6 +715,7 @@ class ActorHandle: method_name: The name of the actor method to execute. args: A list of arguments for the actor method. kwargs: A dictionary of keyword arguments for the actor method. + name (str): The name to give the actor method call task. num_returns (int): The number of return values for the method. Returns: @@ -724,7 +749,7 @@ class ActorHandle: object_refs = worker.core_worker.submit_actor_task( self._ray_actor_language, self._ray_actor_id, function_descriptor, - list_args, num_returns, self._ray_actor_method_cpus) + list_args, name, num_returns, self._ray_actor_method_cpus) if len(object_refs) == 1: object_refs = object_refs[0] diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index de489a9fe..d234c356c 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -241,7 +241,7 @@ cdef extern from "ray/core_worker/common.h" nogil: cdef cppclass CTaskOptions "ray::TaskOptions": CTaskOptions() - CTaskOptions(int num_returns, + CTaskOptions(c_string name, int num_returns, unordered_map[c_string, double] &resources) cdef cppclass CActorCreationOptions "ray::ActorCreationOptions": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index c2cb1f575..f7a0d14c5 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -217,6 +217,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_string stderr_file (CRayStatus( CTaskType task_type, + const c_string name, const CRayFunction &ray_function, const unordered_map[c_string, double] &resources, const c_vector[shared_ptr[CRayObject]] &args, diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 02a4735a2..7ddad8a0a 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -152,7 +152,8 @@ class RemoteFunction: resources=None, max_retries=None, placement_group=None, - placement_group_bundle_index=-1): + placement_group_bundle_index=-1, + name=""): """Submit the remote function for execution.""" worker = ray.worker.global_worker worker.check_connected() @@ -212,7 +213,7 @@ class RemoteFunction: "Cross language remote function " \ "cannot be executed locally." object_refs = worker.core_worker.submit_task( - self._language, self._function_descriptor, list_args, + self._language, self._function_descriptor, list_args, name, num_returns, resources, max_retries, placement_group.id, placement_group_bundle_index) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 6b7cc662b..5d885715d 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -14,6 +14,10 @@ import ray import ray.test_utils import ray.cluster_utils +# NOTE: We have to import setproctitle after ray because we bundle setproctitle +# with ray. +import setproctitle + def test_caching_actors(shutdown_only): # Test defining actors before ray.init() has been called. @@ -673,6 +677,33 @@ def test_multiple_return_values(ray_start_regular_shared): assert ray.get([id3a, id3b, id3c]) == [1, 2, 3] +def test_options_num_returns(ray_start_regular_shared): + @ray.remote + class Foo: + def method(self): + return 1, 2 + + f = Foo.remote() + + obj = f.method.remote() + assert ray.get(obj) == (1, 2) + + obj1, obj2 = f.method.options(num_returns=2).remote() + assert ray.get([obj1, obj2]) == [1, 2] + + +def test_options_name(ray_start_regular_shared): + @ray.remote + class Foo: + def method(self, name): + assert setproctitle.getproctitle() == f"ray::{name}" + + f = Foo.remote() + + ray.get(f.method.options(name="foo").remote("foo")) + ray.get(f.method.options(name="bar").remote("bar")) + + def test_define_actor(ray_start_regular_shared): @ray.remote class Test: diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index ffb6e0cea..14ef5b634 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -35,7 +35,7 @@ def attempt_to_load_balance(remote_function, [remote_function.remote(*args) for _ in range(total_tasks)]) names = set(locations) counts = [locations.count(name) for name in names] - logger.info("Counts are {}.".format(counts)) + logger.info(f"Counts are {counts}.") if (len(names) == num_nodes and all(count >= minimum_count for count in counts)): break @@ -346,6 +346,28 @@ def test_ray_setproctitle(ray_start_2_cpus): ray.get(unique_1.remote()) +def test_ray_task_name_setproctitle(ray_start_2_cpus): + method_task_name = "foo" + + @ray.remote + class UniqueName: + def __init__(self): + assert setproctitle.getproctitle() == "ray::UniqueName.__init__()" + + def f(self): + assert setproctitle.getproctitle() == f"ray::{method_task_name}" + + task_name = "bar" + + @ray.remote + def unique_1(): + assert task_name in setproctitle.getproctitle() + + actor = UniqueName.remote() + ray.get(actor.f.options(name=method_task_name).remote()) + ray.get(unique_1.options(name=task_name).remote()) + + @pytest.mark.skipif( os.getenv("TRAVIS") is None, reason="This test should only be run on Travis.") @@ -508,7 +530,7 @@ def test_invalid_unicode_in_worker_log(shutdown_only): # Wait till first worker log file is created. while True: - log_file_paths = glob.glob("{}/worker*.out".format(logs_dir)) + log_file_paths = glob.glob(f"{logs_dir}/worker*.out") if len(log_file_paths) == 0: time.sleep(0.2) else: @@ -546,13 +568,13 @@ def test_move_log_files_to_old(shutdown_only): # Make sure no log files are in the "old" directory before the actors # are killed. - assert len(glob.glob("{}/old/worker*.out".format(logs_dir))) == 0 + assert len(glob.glob(f"{logs_dir}/old/worker*.out")) == 0 # Now kill the actors so the files get moved to logs/old/. [a.__ray_terminate__.remote() for a in actors] while True: - log_file_paths = glob.glob("{}/old/worker*.out".format(logs_dir)) + log_file_paths = glob.glob(f"{logs_dir}/old/worker*.out") if len(log_file_paths) > 0: with open(log_file_paths[0], "r") as f: assert "function f finished\n" in f.readlines() @@ -641,7 +663,7 @@ Blacklisted: No """ constraints_dict = resource_spec._constraints_from_gpu_info(info_string) expected_dict = { - "{}V100".format(ray_constants.RESOURCE_CONSTRAINT_PREFIX): 1 + f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}V100": 1, } assert constraints_dict == expected_dict @@ -658,7 +680,7 @@ Blacklisted: No """ constraints_dict = resource_spec._constraints_from_gpu_info(info_string) expected_dict = { - "{}T4".format(ray_constants.RESOURCE_CONSTRAINT_PREFIX): 1 + f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}T4": 1, } assert constraints_dict == expected_dict diff --git a/src/ray/common/function_descriptor.h b/src/ray/common/function_descriptor.h index 17abcdc2c..149a4063d 100644 --- a/src/ray/common/function_descriptor.h +++ b/src/ray/common/function_descriptor.h @@ -51,7 +51,14 @@ class FunctionDescriptorInterface : public MessageWrapper Subtype *As() { @@ -79,6 +86,8 @@ class EmptyFunctionDescriptor : public FunctionDescriptorInterface { inline bool operator!=(const EmptyFunctionDescriptor &other) const { return false; } virtual std::string ToString() const { return "{type=EmptyFunctionDescriptor}"; } + + virtual std::string CallString() const { return ""; } }; class JavaFunctionDescriptor : public FunctionDescriptorInterface { @@ -120,6 +129,12 @@ class JavaFunctionDescriptor : public FunctionDescriptorInterface { ", signature=" + typed_message_->signature() + "}"; } + virtual std::string CallString() const { + const std::string &class_name = typed_message_->class_name(); + const std::string &function_name = typed_message_->function_name(); + return class_name.empty() ? function_name : class_name + "." + function_name; + } + const std::string &ClassName() const { return typed_message_->class_name(); } const std::string &FunctionName() const { return typed_message_->function_name(); } @@ -174,8 +189,13 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface { } virtual std::string CallSiteString() const { - return typed_message_->module_name() + "." + typed_message_->class_name() + "." + - typed_message_->function_name(); + return typed_message_->module_name() + "." + CallString(); + } + + virtual std::string CallString() const { + const std::string &class_name = typed_message_->class_name(); + const std::string &function_name = typed_message_->function_name(); + return class_name.empty() ? function_name : class_name + "." + function_name; } const std::string &ModuleName() const { return typed_message_->module_name(); } @@ -229,6 +249,12 @@ class CppFunctionDescriptor : public FunctionDescriptorInterface { ", exec_function_offset=" + typed_message_->exec_function_offset() + "}"; } + virtual std::string CallString() const { + return typed_message_->lib_name() + "+" + typed_message_->function_offset(); + } + + virtual std::string DefaultTaskName() const { return CallString(); } + const std::string &LibName() const { return typed_message_->lib_name(); } const std::string &FunctionOffset() const { return typed_message_->function_offset(); } diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index e9fa7ab4e..8f880e077 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -192,6 +192,8 @@ bool TaskSpecification::IsDriverTask() const { return message_->type() == TaskType::DRIVER_TASK; } +const std::string TaskSpecification::GetName() const { return message_->name(); } + Language TaskSpecification::GetLanguage() const { return message_->language(); } bool TaskSpecification::IsNormalTask() const { @@ -299,8 +301,9 @@ std::string TaskSpecification::DebugString() const { // Print function descriptor. stream << FunctionDescriptor()->ToString(); - stream << ", task_id=" << TaskId() << ", job_id=" << JobId() - << ", num_args=" << NumArgs() << ", num_returns=" << NumReturns(); + stream << ", task_id=" << TaskId() << ", task_name=" << GetName() + << ", job_id=" << JobId() << ", num_args=" << NumArgs() + << ", num_returns=" << NumReturns(); if (IsActorCreationTask()) { // Print actor creation task spec. diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index e300c7624..d079627ee 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -135,6 +135,9 @@ class TaskSpecification : public MessageWrapper { Language GetLanguage() const; + // Returns the task's name. + const std::string GetName() const; + /// Whether this task is a normal task. bool IsNormalTask() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index c0670fd2b..195e5c8dd 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -80,7 +80,7 @@ class TaskSpecBuilder { /// /// \return Reference to the builder object itself. TaskSpecBuilder &SetCommonTaskSpec( - const TaskID &task_id, const Language &language, + const TaskID &task_id, const std::string name, const Language &language, const ray::FunctionDescriptor &function_descriptor, const JobID &job_id, const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id, const rpc::Address &caller_address, uint64_t num_returns, @@ -88,6 +88,7 @@ class TaskSpecBuilder { const std::unordered_map &required_placement_resources, const PlacementGroupID &placement_group_id) { message_->set_type(TaskType::NORMAL_TASK); + message_->set_name(name); message_->set_language(language); *message_->mutable_function_descriptor() = function_descriptor->GetMessage(); message_->set_job_id(job_id.Binary()); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 2cd161552..f2b1765c6 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -54,9 +54,12 @@ class RayFunction { /// Options for all tasks (actor and non-actor) except for actor creation. struct TaskOptions { TaskOptions() {} - TaskOptions(int num_returns, std::unordered_map &resources) - : num_returns(num_returns), resources(resources) {} + TaskOptions(std::string name, int num_returns, + std::unordered_map &resources) + : name(name), num_returns(num_returns), resources(resources) {} + /// The name of this task. + std::string name; /// Number of returns of this task. int num_returns = 1; /// Resources required by this task. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index abd3d7712..9315e07ea 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -32,15 +32,16 @@ const int kInternalHeartbeatMillis = 1000; void BuildCommonTaskSpec( ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const TaskID ¤t_task_id, const int task_index, const TaskID &caller_id, - const ray::rpc::Address &address, const ray::RayFunction &function, + const std::string name, const TaskID ¤t_task_id, const int task_index, + const TaskID &caller_id, const ray::rpc::Address &address, + const ray::RayFunction &function, const std::vector> &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, std::vector *return_ids, const ray::PlacementGroupID &placement_group_id) { // Build common task spec. builder.SetCommonTaskSpec( - task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, + task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, required_resources, required_placement_resources, placement_group_id); // Set task arguments. @@ -1268,8 +1269,11 @@ void CoreWorker::SubmitTask(const RayFunction &function, auto constrained_resources = AddPlacementGroupConstraint( task_options.resources, placement_options.first, placement_options.second); const std::unordered_map required_resources; + auto task_name = task_options.name.empty() + ? function.GetFunctionDescriptor()->DefaultTaskName() + : task_options.name; // TODO(ekl) offload task building onto a thread pool for performance - BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, + BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, task_options.num_returns, constrained_resources, required_resources, return_ids, @@ -1310,16 +1314,21 @@ Status CoreWorker::CreateActor(const RayFunction &function, auto new_resource = AddPlacementGroupConstraint( actor_creation_options.resources, actor_creation_options.placement_options.first, actor_creation_options.placement_options.second); - BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, + const auto actor_name = actor_creation_options.name; + const auto task_name = + actor_name.empty() + ? function.GetFunctionDescriptor()->DefaultTaskName() + : actor_name + ":" + function.GetFunctionDescriptor()->CallString(); + BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, new_resource, new_placement_resources, &return_ids, actor_creation_options.placement_options.first); - builder.SetActorCreationTaskSpec( - actor_id, actor_creation_options.max_restarts, - actor_creation_options.dynamic_worker_options, - actor_creation_options.max_concurrency, actor_creation_options.is_detached, - actor_creation_options.name, actor_creation_options.is_asyncio, extension_data); + builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_restarts, + actor_creation_options.dynamic_worker_options, + actor_creation_options.max_concurrency, + actor_creation_options.is_detached, actor_name, + actor_creation_options.is_asyncio, extension_data); // Add the actor handle before we submit the actor creation task, since the // actor handle must be in scope by the time the GCS sends the @@ -1340,7 +1349,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, if (task_spec.IsDetachedActor()) { // Since local mode doesn't pass GCS actor management code path, // it just register actor names in memory. - local_mode_named_actor_registry_.emplace(actor_creation_options.name, actor_id); + local_mode_named_actor_registry_.emplace(actor_name, actor_id); } ExecuteTaskLocalMode(task_spec); } else { @@ -1412,7 +1421,10 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), next_task_index, actor_handle->GetActorID()); const std::unordered_map required_resources; - BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, + const auto task_name = task_options.name.empty() + ? function.GetFunctionDescriptor()->DefaultTaskName() + : task_options.name; + BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, num_returns, task_options.resources, required_resources, return_ids, PlacementGroupID::Nil()); @@ -1687,8 +1699,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, CoreWorkerProcess::SetCurrentThreadWorkerId(GetWorkerID()); status = options_.task_execution_callback( - task_type, func, task_spec.GetRequiredResources().GetResourceMap(), args, - arg_reference_ids, return_ids, return_objects); + task_type, task_spec.GetName(), func, + task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids, + return_ids, return_objects); absl::optional caller_address( options_.is_local_mode ? absl::optional() @@ -2143,6 +2156,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope()); + stats->set_current_task_name(current_task_.GetName()); stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 78e786930..5d4c68134 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -57,7 +57,7 @@ struct CoreWorkerOptions { // Callback that must be implemented and provided by the language-specific worker // frontend to execute tasks and return their results. using TaskExecutionCallback = std::function &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index e1acf26f5..7f510b2c1 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -105,7 +105,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( RayConfig::instance().initialize(raylet_config); auto task_execution_callback = - [](ray::TaskType task_type, const ray::RayFunction &ray_function, + [](ray::TaskType task_type, const std::string task_name, + const ray::RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 0541b0a4b..118340e39 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -90,13 +90,18 @@ inline std::unordered_map ToResources(JNIEnv *env, inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptions) { std::unordered_map resources; + std::string name = ""; if (callOptions) { jobject java_resources = env->GetObjectField(callOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); + auto java_name = (jstring)env->GetObjectField(callOptions, java_call_options_name); + if (java_name) { + name = JavaStringToNativeString(env, java_name); + } } - ray::TaskOptions task_options{numReturns, resources}; + ray::TaskOptions task_options{name, numReturns, resources}; return task_options; } diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index 1213de4f6..ed1860b15 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -75,6 +75,9 @@ jfieldID java_function_arg_value; jclass java_base_task_options_class; jfieldID java_base_task_options_resources; +jclass java_call_options_class; +jfieldID java_call_options_name; + jclass java_actor_creation_options_class; jfieldID java_actor_creation_options_global; jfieldID java_actor_creation_options_name; @@ -198,6 +201,10 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_base_task_options_resources = env->GetFieldID(java_base_task_options_class, "resources", "Ljava/util/Map;"); + java_call_options_class = LoadClass(env, "io/ray/api/options/CallOptions"); + java_call_options_name = + env->GetFieldID(java_call_options_class, "name", "Ljava/lang/String;"); + java_placement_group_class = LoadClass(env, "io/ray/runtime/placementgroup/PlacementGroupImpl"); java_placement_group_id = diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 37b7b0544..a5c14d76f 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -128,6 +128,11 @@ extern jclass java_base_task_options_class; /// resources field of BaseTaskOptions class extern jfieldID java_base_task_options_resources; +/// CallOptions class +extern jclass java_call_options_class; +/// name field of CallOptions class +extern jfieldID java_call_options_name; + /// ActorCreationOptions class extern jclass java_actor_creation_options_class; /// global field of ActorCreationOptions class diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index fdae29a66..b03218270 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -226,7 +226,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(const ActorID &actor_id, bool w int CoreWorkerTest::GetActorPid(const ActorID &actor_id, std::unordered_map &resources) { std::vector> args; - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "GetWorkerPid", "", "", "")}; @@ -308,7 +308,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso args.emplace_back(new TaskArgByValue( std::make_shared(buffer2, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -350,7 +350,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso args.emplace_back(new TaskArgByValue( std::make_shared(buffer2, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -412,7 +412,7 @@ void CoreWorkerTest::TestActorRestart( args.emplace_back(new TaskArgByValue( std::make_shared(buffer1, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -455,7 +455,7 @@ void CoreWorkerTest::TestActorFailure( args.emplace_back(new TaskArgByValue( std::make_shared(buffer1, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -539,12 +539,12 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { RAY_LOG(INFO) << "start creating " << num_tasks << " PushTaskRequests"; rpc::Address address; for (int i = 0; i < num_tasks; i++) { - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; auto num_returns = options.num_returns; TaskSpecBuilder builder; - builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(), + builder.SetCommonTaskSpec(RandomTaskId(), options.name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, RandomTaskId(), address, num_returns, resources, resources, PlacementGroupID::Nil()); @@ -587,7 +587,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector()))); - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index ff21188ad..83ab5cdd7 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -324,9 +324,10 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r const ray::FunctionDescriptor &function_descriptor) { TaskSpecBuilder builder; rpc::Address empty_address; - builder.SetCommonTaskSpec(TaskID::Nil(), Language::PYTHON, function_descriptor, - JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address, - 1, resources, resources, PlacementGroupID::Nil()); + builder.SetCommonTaskSpec(TaskID::Nil(), "dummy_task", Language::PYTHON, + function_descriptor, JobID::Nil(), TaskID::Nil(), 0, + TaskID::Nil(), empty_address, 1, resources, resources, + PlacementGroupID::Nil()); return builder.Build(); } diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index d705d97e5..29b6a26f0 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -50,8 +50,8 @@ class MockWorker { "", // driver_name "", // stdout_file "", // stderr_file - std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, - _7), // task_execution_callback + std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, + _8), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect nullptr, // spill_objects @@ -71,7 +71,8 @@ class MockWorker { void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } private: - Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + Status ExecuteTask(TaskType task_type, const std::string task_name, + const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 3c63e9fb6..fea0df303 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -38,9 +38,10 @@ struct Mocker { auto actor_id = ActorID::Of(job_id, RandomTaskId(), 0); auto task_id = TaskID::ForActorCreationTask(actor_id); auto resource = std::unordered_map(); - builder.SetCommonTaskSpec(task_id, Language::PYTHON, empty_descriptor, job_id, - TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, resource, - resource, PlacementGroupID::Nil()); + builder.SetCommonTaskSpec(task_id, name + ":" + empty_descriptor->CallString(), + Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(), + 0, TaskID::Nil(), owner_address, 1, resource, resource, + PlacementGroupID::Nil()); builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name); return builder.Build(); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index fa2cf15a6..e879bc424 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -150,42 +150,44 @@ message RayException { message TaskSpec { // Type of this task. TaskType type = 1; + // Name of this task. + string name = 2; // Language of this task. - Language language = 2; + Language language = 3; // Function descriptor of this task uniquely describe the function to execute. - FunctionDescriptor function_descriptor = 3; + FunctionDescriptor function_descriptor = 4; // ID of the job that this task belongs to. - bytes job_id = 4; + bytes job_id = 5; // Task ID of the task. - bytes task_id = 5; + bytes task_id = 6; // Task ID of the parent task. - bytes parent_task_id = 6; + bytes parent_task_id = 7; // A count of the number of tasks submitted by the parent task before this one. - uint64 parent_counter = 7; + uint64 parent_counter = 8; // Task ID of the caller. This is the same as parent_task_id for non-actors. // This is the actor ID (embedded in a nil task ID) for actors. - bytes caller_id = 8; + bytes caller_id = 9; /// Address of the caller. - Address caller_address = 9; + Address caller_address = 10; // Task arguments. - repeated TaskArg args = 10; + repeated TaskArg args = 11; // Number of return objects. - uint64 num_returns = 11; + uint64 num_returns = 12; // Quantities of the different resources required by this task. - map required_resources = 12; + map required_resources = 13; // The resources required for placing this task on a node. If this is empty, // then the placement resources are equal to the required_resources. - map required_placement_resources = 13; + map required_placement_resources = 14; // Task specification for an actor creation task. // This field is only valid when `type == ACTOR_CREATION_TASK`. - ActorCreationTaskSpec actor_creation_task_spec = 14; + ActorCreationTaskSpec actor_creation_task_spec = 15; // Task specification for an actor task. // This field is only valid when `type == ACTOR_TASK`. - ActorTaskSpec actor_task_spec = 15; + ActorTaskSpec actor_task_spec = 16; // Number of times this task may be retried on worker failure. - int32 max_retries = 16; + int32 max_retries = 17; // Placement group that is associated with this task. - bytes placement_group_id = 17; + bytes placement_group_id = 18; } message Bundle { @@ -358,34 +360,36 @@ message CoreWorkerStats { int32 num_pending_tasks = 2; // Number of object refs in local scope. int32 num_object_refs_in_scope = 3; + // Name of the currently executing task. + string current_task_name = 4; // String representation of the function descriptor of the currently executing task. - string current_task_func_desc = 4; + string current_task_func_desc = 5; // IP address of the core worker. - string ip_address = 6; + string ip_address = 7; // Port of the core worker. - int64 port = 7; + int64 port = 8; // Actor ID. - bytes actor_id = 8; + bytes actor_id = 9; // A map from the resource name (e.g. "CPU") to its allocation. - map used_resources = 9; + map used_resources = 10; // A string displayed on Dashboard. - map webui_display = 10; + map webui_display = 11; // Number of objects that are IN_PLASMA_ERROR in the local memory store. - int32 num_in_plasma = 11; + int32 num_in_plasma = 12; // Number of objects stored in local memory. - int32 num_local_objects = 12; + int32 num_local_objects = 13; // Used local object store memory. - int64 used_object_store_memory = 13; + int64 used_object_store_memory = 14; // Length of the task queue. - int32 task_queue_length = 14; + int32 task_queue_length = 15; // Number of executed tasks. - int32 num_executed_tasks = 15; + int32 num_executed_tasks = 16; // Actor constructor. - string actor_title = 16; + string actor_title = 17; // Local reference table. - repeated ObjectRefInfo object_refs = 17; + repeated ObjectRefInfo object_refs = 18; // Job ID. - bytes job_id = 18; + bytes job_id = 19; } message MetricPoint { diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 57d329ca1..d09fe612b 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -264,7 +264,7 @@ Task CreateTask(const std::unordered_map &required_resource TaskID id = RandomTaskId(); JobID job_id = RandomJobId(); rpc::Address address; - spec_builder.SetCommonTaskSpec(id, Language::PYTHON, + spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, required_resources, {}, PlacementGroupID::Nil()); @@ -489,4 +489,4 @@ int main(int argc, char **argv) { } } // namespace raylet -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 27129ae50..52378e24d 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -65,7 +65,7 @@ static inline Task ExampleTask(const std::vector &arguments, uint64_t num_returns) { TaskSpecBuilder builder; rpc::Address address; - builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, + builder.SetCommonTaskSpec(RandomTaskId(), "example_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, num_returns, {}, {}, PlacementGroupID::Nil()); diff --git a/streaming/src/queue/transport.cc b/streaming/src/queue/transport.cc index 01841b324..cd30955fa 100644 --- a/streaming/src/queue/transport.cc +++ b/streaming/src/queue/transport.cc @@ -12,7 +12,8 @@ void Transport::SendInternal(std::shared_ptr buffer, RayFunction &function, int return_num, std::vector &return_ids) { std::unordered_map resources; - TaskOptions options{return_num, resources}; + std::string name = function.GetFunctionDescriptor()->DefaultTaskName(); + TaskOptions options{name, return_num, resources}; char meta_data[3] = {'R', 'A', 'W'}; std::shared_ptr meta = diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index 0c15d72c6..9bbcad803 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -496,8 +496,8 @@ class StreamingWorker { "", // driver_name "", // stdout_file "", // stderr_file - std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, - _7), // task_execution_callback + std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, + _8), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect nullptr, // spill_objects @@ -521,7 +521,8 @@ class StreamingWorker { } private: - Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + Status ExecuteTask(TaskType task_type, const std::string task_name, + const RayFunction &ray_function, const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index f57c31e10..25c4c0061 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -87,7 +87,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue(std::make_shared( msg.ToBytes(), nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{0, resources}; + TaskOptions options{"", 0, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")}; @@ -103,7 +103,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{0, resources}; + TaskOptions options{"", 0, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", test, "execute_test", "")}; @@ -119,7 +119,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(new TaskArgByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{1, resources}; + TaskOptions options{"", 1, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", "", "check_current_test_status", "")};