adding pylint (#233)

This commit is contained in:
Robert Nishihara
2016-07-08 12:39:11 -07:00
committed by Philipp Moritz
parent d8a621e5cb
commit 191909dd93
7 changed files with 422 additions and 19 deletions
+1 -1
View File
@@ -192,7 +192,7 @@ def subblocks(a, *ranges):
if ranges[i][0] < 0:
raise Exception("Values in the ranges passed to sub_blocks must be at least 0, but the {}th range is {}.".format(i, ranges[i]))
if ranges[i][-1] >= a.num_blocks[i]:
raise Exception("Values in the ranges passed to sub_blocks must be less than the relevant number of blocks, but the {}th range is {}, and a.num_blocks = {}.".format(i, ranges[i], a.num_blocks))
raise Exception("Values in the ranges passed to sub_blocks must be less than the relevant number of blocks, but the {}th range is {}, and a.num_blocks = {}.".format(i, ranges[i], a.num_blocks))
last_index = [r[-1] for r in ranges]
last_block_shape = DistArray.compute_block_shape(last_index, a.shape)
shape = [(len(ranges[i]) - 1) * BLOCK_SIZE + last_block_shape[i] for i in range(a.ndim)]
@@ -1,5 +1,3 @@
from typing import List
import numpy as np
import ray.array.remote as ra
import ray
+1 -1
View File
@@ -52,7 +52,7 @@ def load_tarfile_from_s3(bucket, s3_key, size=[]):
output.write(chunk)
chunk = response["Body"].read(1024 * 8)
output.seek(0) # go to the beginning of the .tar file
tar = tarfile.open(mode= "r", fileobj=output)
tar = tarfile.open(mode="r", fileobj=output)
return load_chunk(tar, size=size if size != [] else None)
@ray.remote([str, List[str], List[int]], [List[ray.ObjRef]])
+1 -1
View File
@@ -51,7 +51,7 @@ def from_primitive(primitive_obj):
return obj
def is_arrow_serializable(value):
return type(value) == np.ndarray and value.dtype.name in ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64", "float32", "float64"]
return isinstance(value, np.ndarray) and value.dtype.name in ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64", "float32", "float64"]
def serialize(worker_capsule, obj):
primitive_obj = to_primitive(obj)
+3 -4
View File
@@ -1,8 +1,7 @@
import subprocess32 as subprocess
import os
import atexit
import time
import datetime
import atexit
import subprocess32 as subprocess
import ray
import worker
@@ -196,7 +195,7 @@ def start_services_local(num_objstores=1, num_workers_per_objstore=0, worker_pat
time.sleep(0.1)
objstore_addresses = []
# create objstores
for i in range(num_objstores):
for _ in range(num_objstores):
objstore_address = address(IP_ADDRESS, new_objstore_port())
objstore_addresses.append(objstore_address)
start_objstore(scheduler_address, objstore_address, local=True)
+9 -10
View File
@@ -1,14 +1,13 @@
import time
import datetime
import logging
import os
import time
import traceback
import copy
import logging
from types import ModuleType
import typing
import funcsigs
import numpy as np
import colorama
import copy
import traceback
import ray
import ray.config as config
@@ -147,7 +146,7 @@ def print_task_info(task_data, mode):
print ", ".join(info_strings)
def scheduler_info(worker=global_worker):
return ray.lib.scheduler_info(worker.handle);
return ray.lib.scheduler_info(worker.handle)
def visualize_computation_graph(file_path=None, view=False, worker=global_worker):
"""
@@ -188,7 +187,7 @@ def visualize_computation_graph(file_path=None, view=False, worker=global_worker
def task_info(worker=global_worker):
"""Tell the scheduler to return task information. Currently includes a list of all failed tasks since the start of the cluster."""
return ray.lib.task_info(worker.handle);
return ray.lib.task_info(worker.handle)
def register_module(module, worker=global_worker):
"""
@@ -258,7 +257,7 @@ def kill_workers(worker=global_worker):
"""
success = ray.lib.kill_workers(worker.handle)
if not success:
print "Could not kill all workers; check that there are no tasks currently running."
print "Could not kill all workers. Check that there are no tasks currently running."
return success
def restart_workers_local(num_workers, worker_path, worker=global_worker):
@@ -301,7 +300,7 @@ def main_loop(worker=global_worker):
outputs = worker.functions[func_name].executor(arguments) # execute the function
if len(return_objrefs) == 1:
outputs = (outputs,)
except Exception as e:
except Exception:
exception_message = format_error_message(traceback.format_exc())
# Here we are storing RayFailedObjects in the object store to indicate
# failure (this is only interpreted by the worker).
@@ -374,7 +373,7 @@ def check_signature_supported(function):
if function.has_kwargs_param:
raise "Function {} has a **kwargs argument, which is currently not supported.".format(function.__name__)
# check if the user specified a variable number of arguments and any keyword arguments
if function.has_vararg_param and any([d != funcsigs._empty for k, d in function.keyword_defaults]):
if function.has_vararg_param and any([d != funcsigs._empty for _, d in function.keyword_defaults]):
raise "Function {} has a *args argument as well as a keyword argument, which is currently not supported.".format(function.__name__)