Fix python linting (#2076)

This commit is contained in:
Melih Elibol
2018-05-16 15:04:31 -07:00
committed by Robert Nishihara
parent 88fa98e851
commit bea97b425b
14 changed files with 91 additions and 88 deletions
+3 -3
View File
@@ -75,9 +75,9 @@ class NodeUpdater(object):
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "UpdateFailed"})
if self.logfile is not None:
print("----- BEGIN REMOTE LOGS -----\n" + open(
self.logfile.name).read() + "\n----- END REMOTE LOGS -----"
)
print("----- BEGIN REMOTE LOGS -----\n" +
open(self.logfile.name).read() +
"\n----- END REMOTE LOGS -----")
raise e
self.provider.set_node_tags(
self.node_id, {
@@ -76,10 +76,9 @@ def tsqr(a):
lower = [a.shape[1], 0]
upper = [2 * a.shape[1], core.BLOCK_SIZE]
ith_index //= 2
q_block_current = ra.dot.remote(q_block_current,
ra.subarray.remote(
q_tree[ith_index, j], lower,
upper))
q_block_current = ra.dot.remote(
q_block_current,
ra.subarray.remote(q_tree[ith_index, j], lower, upper))
q_result.objectids[i] = q_block_current
r = current_rs[0]
return q_result, ray.get(r)
@@ -222,10 +221,10 @@ def qr(a):
y_col_block = core.subblocks.remote(y_res, [], [i])
q = core.subtract.remote(
q,
core.dot.remote(y_col_block,
core.dot.remote(
Ts[i],
core.dot.remote(
core.transpose.remote(y_col_block), q))))
core.dot.remote(
y_col_block,
core.dot.remote(
Ts[i],
core.dot.remote(core.transpose.remote(y_col_block), q))))
return ray.get(q), r_res
+4 -4
View File
@@ -749,8 +749,8 @@ class GlobalState(object):
"name":
"SubmitTask",
"args": {},
"id": (parent_info["worker_id"] +
str(micros(min(parent_times))))
"id": (parent_info["worker_id"] + str(
micros(min(parent_times))))
}
full_trace.append(parent)
@@ -824,8 +824,8 @@ class GlobalState(object):
"name":
"SubmitTask",
"args": {},
"id": (parent_info["worker_id"] +
str(micros(min(parent_times))))
"id": (parent_info["worker_id"] + str(
micros(min(parent_times))))
}
full_trace.append(parent)
+6 -6
View File
@@ -451,8 +451,8 @@ def task_completion_time_distribution():
# Create the distribution to plot
distr = []
for task_id, data in tasks.items():
distr.append(
data["store_outputs_end"] - data["get_arguments_start"])
distr.append(data["store_outputs_end"] -
data["get_arguments_start"])
# Create a histogram from the distribution
top, bin_edges = np.histogram(distr, bins="auto")
@@ -520,10 +520,10 @@ def compute_utilizations(abs_earliest,
# Walk over each time bucket that this task intersects, adding the
# amount of time that the task intersects within each bucket
for bucket_idx in range(start_bucket, end_bucket + 1):
bucket_start_time = ((
earliest_time + bucket_idx) * bucket_time_length)
bucket_end_time = ((earliest_time +
(bucket_idx + 1)) * bucket_time_length)
bucket_start_time = (
(earliest_time + bucket_idx) * bucket_time_length)
bucket_end_time = (
(earliest_time + (bucket_idx + 1)) * bucket_time_length)
task_start_time_within_bucket = max(task_start_time,
bucket_start_time)
+3 -2
View File
@@ -39,8 +39,9 @@ class LogMonitor(object):
def update_log_filenames(self):
"""Get the most up-to-date list of log files to monitor from Redis."""
num_current_log_files = len(self.log_files)
new_log_filenames = self.redis_client.lrange("LOG_FILENAMES:{}".format(
self.node_ip_address), num_current_log_files, -1)
new_log_filenames = self.redis_client.lrange(
"LOG_FILENAMES:{}".format(self.node_ip_address),
num_current_log_files, -1)
for log_filename in new_log_filenames:
print("Beginning to track file {}".format(log_filename))
assert log_filename not in self.log_files
+5 -6
View File
@@ -189,10 +189,9 @@ class Monitor(object):
if manager in self.dead_plasma_managers:
# If the object was on a dead plasma manager, remove that
# location entry.
ok = self.state._execute_command(object_id,
"RAY.OBJECT_TABLE_REMOVE",
object_id.id(),
hex_to_binary(manager))
ok = self.state._execute_command(
object_id, "RAY.OBJECT_TABLE_REMOVE", object_id.id(),
hex_to_binary(manager))
if ok != b"OK":
log.warn("Failed to remove object location for dead "
"plasma manager.")
@@ -507,8 +506,8 @@ class Monitor(object):
log.debug("{} dead local schedulers, {} plasma managers total, {} "
"dead plasma managers".format(
len(self.dead_local_schedulers),
(len(self.live_plasma_managers) +
len(self.dead_plasma_managers)),
(len(self.live_plasma_managers) + len(
self.dead_plasma_managers)),
len(self.dead_plasma_managers)))
# Handle messages from the subscription channels.
+3 -3
View File
@@ -391,9 +391,9 @@ class Bracket():
def __repr__(self):
status = ", ".join([
"Max Size (n)={}".format(self._n), "Milestone (r)={}".format(
self._cumul_r), "completed={:.1%}".format(
self.completion_percentage())
"Max Size (n)={}".format(self._n),
"Milestone (r)={}".format(self._cumul_r),
"completed={:.1%}".format(self.completion_percentage())
])
counts = collections.Counter([t.status for t in self._all_trials])
trial_statuses = ", ".join(
+22 -18
View File
@@ -370,12 +370,14 @@ class HyperbandSuite(unittest.TestCase):
mock_runner._launch_trial(t)
sched.on_trial_error(mock_runner, t3)
self.assertEqual(TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))
def testTrialErrored2(self):
"""Check successive halving happened even when last trial failed"""
@@ -405,12 +407,14 @@ class HyperbandSuite(unittest.TestCase):
mock_runner._launch_trial(t)
sched.on_trial_complete(mock_runner, t3, result(1, 12))
self.assertEqual(TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))
def testTrialEndedEarly2(self):
"""Check successive halving happened even when last trial failed"""
@@ -449,13 +453,13 @@ class HyperbandSuite(unittest.TestCase):
self.assertEqual(len(sched._state["bracket"].current_trials()), 2)
# Make sure that newly added trial gets fair computation (not just 1)
self.assertEqual(TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t,
result(init_units, 12)))
self.assertEqual(
TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t, result(init_units, 12)))
new_units = init_units + int(init_units * sched._eta)
self.assertEqual(TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t,
result(new_units, 12)))
self.assertEqual(
TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t, result(new_units, 12)))
def testAlternateMetrics(self):
"""Checking that alternate metrics will pass."""
+10 -7
View File
@@ -174,8 +174,8 @@ class Trial(object):
try:
if error_msg and self.logdir:
self.num_failures += 1
error_file = os.path.join(self.logdir, "error_{}.txt".format(
date_str()))
error_file = os.path.join(self.logdir,
"error_{}.txt".format(date_str()))
with open(error_file, "w") as f:
f.write(error_msg)
self.error_file = error_file
@@ -259,9 +259,10 @@ class Trial(object):
return '{} pid={}'.format(hostname, pid)
pieces = [
'{} [{}]'.format(self._status_string(),
location_string(self.last_result.hostname,
self.last_result.pid)),
'{} [{}]'.format(
self._status_string(),
location_string(self.last_result.hostname,
self.last_result.pid)),
'{} s'.format(int(self.last_result.time_total_s)), '{} ts'.format(
int(self.last_result.timesteps_total))
]
@@ -281,8 +282,10 @@ class Trial(object):
return ', '.join(pieces)
def _status_string(self):
return "{}{}".format(self.status, ", {} failures: {}".format(
self.num_failures, self.error_file) if self.error_file else "")
return "{}{}".format(
self.status, ", {} failures: {}".format(self.num_failures,
self.error_file)
if self.error_file else "")
def has_checkpoint(self):
return self._checkpoint_path is not None or \
+6 -8
View File
@@ -845,10 +845,9 @@ class Worker(object):
e, None)
return
except Exception as e:
self._handle_process_task_failure(function_id, return_object_ids,
e,
ray.utils.format_error_message(
traceback.format_exc()))
self._handle_process_task_failure(
function_id, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
return
# Execute the task.
@@ -881,10 +880,9 @@ class Worker(object):
outputs = (outputs, )
self._store_outputs_in_objstore(return_object_ids, outputs)
except Exception as e:
self._handle_process_task_failure(function_id, return_object_ids,
e,
ray.utils.format_error_message(
traceback.format_exc()))
self._handle_process_task_failure(
function_id, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
def _handle_process_task_failure(self, function_id, return_object_ids,
error, backtrace):
+9 -8
View File
@@ -1038,14 +1038,14 @@ class ActorsWithGPUs(unittest.TestCase):
def locations_to_intervals_for_many_tasks():
# Launch a bunch of GPU tasks.
locations_ids_and_intervals = ray.get([
f1.remote() for _ in range(
5 * num_local_schedulers * num_gpus_per_scheduler)
f1.remote() for _ in range(5 * num_local_schedulers *
num_gpus_per_scheduler)
] + [
f2.remote() for _ in range(
5 * num_local_schedulers * num_gpus_per_scheduler)
f2.remote() for _ in range(5 * num_local_schedulers *
num_gpus_per_scheduler)
] + [
f1.remote() for _ in range(
5 * num_local_schedulers * num_gpus_per_scheduler)
f1.remote() for _ in range(5 * num_local_schedulers *
num_gpus_per_scheduler)
])
locations_to_intervals = collections.defaultdict(lambda: [])
@@ -1108,8 +1108,9 @@ class ActorsWithGPUs(unittest.TestCase):
# Create more actors to fill up all the GPUs.
more_actors = [
Actor1.remote() for _ in range(
num_local_schedulers * num_gpus_per_scheduler - 1 - 3)
Actor1.remote()
for _ in range(num_local_schedulers * num_gpus_per_scheduler - 1 -
3)
]
# Wait for the actors to finish being created.
ray.get([actor.get_location_and_ids.remote() for actor in more_actors])
+6 -5
View File
@@ -67,11 +67,12 @@ class DistributedArrayTest(unittest.TestCase):
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE],
np.array([[a], [b]]))
assert_equal(x.assemble(),
np.vstack([
np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]),
np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])
]))
assert_equal(
x.assemble(),
np.vstack([
np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]),
np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])
]))
def testMethods(self):
for module in [
+2 -3
View File
@@ -387,9 +387,8 @@ class PutErrorTest(unittest.TestCase):
# on the one before it. The result of the first task should get
# evicted.
args = []
arg = single_dependency.remote(0,
np.zeros(
object_size, dtype=np.uint8))
arg = single_dependency.remote(
0, np.zeros(object_size, dtype=np.uint8))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)
+4 -6
View File
@@ -180,12 +180,10 @@ LIST_OBJECTS = [[obj] for obj in BASE_OBJECTS]
TUPLE_OBJECTS = [(obj, ) for obj in BASE_OBJECTS]
# The check that type(obj).__module__ != "numpy" should be unnecessary, but
# otherwise this seems to fail on Mac OS X on Travis.
DICT_OBJECTS = (
[{
obj: obj
} for obj in PRIMITIVE_OBJECTS
if (obj.__hash__ is not None and type(obj).__module__ != "numpy")] +
[{
DICT_OBJECTS = ([{
obj: obj
} for obj in PRIMITIVE_OBJECTS if (
obj.__hash__ is not None and type(obj).__module__ != "numpy")] + [{
0: obj
} for obj in BASE_OBJECTS] + [{
Foo(123): Foo(456)