Initial Skeleton for Streaming API (#4126)

This commit is contained in:
John Liagouris
2019-02-26 12:15:08 -08:00
committed by Robert Nishihara
parent 62055cc01c
commit 89ce4c56aa
17 changed files with 2461 additions and 4 deletions
@@ -0,0 +1,16 @@
Streaming Library
=================
Dependencies:
Install NetworkX: ``pip install networkx``
Examples:
- simple.py: A simple example with stateless operators and different parallelism per stage.
Run ``python simple.py --input-file toy.txt``
- wordcount.py: A streaming wordcount example with a stateful operator (rolling sum).
Run ``python wordcount.py --titles-file articles.txt``
@@ -0,0 +1,227 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import numpy as np
import threading
import time
import ray
from ray.experimental import internal_kv
logger = logging.getLogger(__name__)
logger.setLevel("INFO")
def plasma_prefetch(object_id):
"""Tells plasma to prefetch the given object_id."""
local_sched_client = ray.worker.global_worker.raylet_client
ray_obj_id = ray.ObjectID(object_id)
local_sched_client.fetch_or_reconstruct([ray_obj_id], True)
def plasma_get(object_id):
"""Get an object directly from plasma without going through object table.
Precondition: plasma_prefetch(object_id) has been called before.
"""
client = ray.worker.global_worker.plasma_client
plasma_id = ray.pyarrow.plasma.ObjectID(object_id)
while not client.contains(plasma_id):
pass
return client.get(plasma_id)
# TODO: doing the timer in Python land is a bit slow
class FlushThread(threading.Thread):
"""A thread that flushes periodically to plasma.
Attributes:
interval: The flush timeout per batch.
flush_fn: The flush function.
"""
def __init__(self, interval, flush_fn):
threading.Thread.__init__(self)
self.interval = interval # Interval is the max_batch_time
self.flush_fn = flush_fn
self.daemon = True
def run(self):
while True:
time.sleep(self.interval) # Flushing period
self.flush_fn()
class BatchedQueue(object):
"""A batched queue for actor to actor communication.
Attributes:
max_size (int): The maximum size of the queue in number of batches
(if exceeded, backpressure kicks in)
max_batch_size (int): The size of each batch in number of records.
max_batch_time (float): The flush timeout per batch.
prefetch_depth (int): The number of batches to prefetch from plasma.
background_flush (bool): Denotes whether a daemon flush thread should
be used (True) to flush batches to plasma.
base (ndarray): A unique signature for the queue.
read_ack_key (bytes): The signature of the queue in bytes.
prefetch_batch_offset (int): The number of the last read prefetched
batch.
read_batch_offset (int): The number of the last read batch.
read_item_offset (int): The number of the last read record inside a
batch.
write_batch_offset (int): The number of the last written batch.
write_item_offset (int): The numebr of the last written item inside a
batch.
write_buffer (list): The write buffer, i.e. an in-memory batch.
last_flush_time (float): The time the last flushing to plasma took
place.
cached_remote_offset (int): The number of the last read batch as
recorded by the writer after the previous flush.
flush_lock (RLock): A python lock used for flushing batches to plasma.
flush_thread (Threading): The python thread used for flushing batches
to plasma.
"""
def __init__(self,
max_size=999999,
max_batch_size=99999,
max_batch_time=0.01,
prefetch_depth=10,
background_flush=True):
self.max_size = max_size
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.prefetch_depth = prefetch_depth
self.background_flush = background_flush
# Common queue metadata -- This serves as the unique id of the queue
self.base = np.random.randint(0, 2**32 - 1, size=5, dtype="uint32")
self.base[-2] = 0
self.base[-1] = 0
self.read_ack_key = np.ndarray.tobytes(self.base)
# Reader state
self.prefetch_batch_offset = 0
self.read_item_offset = 0
self.read_batch_offset = 0
self.read_buffer = []
# Writer state
self.write_item_offset = 0
self.write_batch_offset = 0
self.write_buffer = []
self.last_flush_time = 0.0
self.cached_remote_offset = 0
self.flush_lock = threading.RLock()
self.flush_thread = FlushThread(self.max_batch_time,
self._flush_writes)
def __getstate__(self):
state = dict(self.__dict__)
del state["flush_lock"]
del state["flush_thread"]
del state["write_buffer"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
# This is to enable writing functionality in
# case the queue is not created by the writer
# The reason is that python locks cannot be serialized
def enable_writes(self):
"""Restores the state of the batched queue for writing."""
self.write_buffer = []
self.flush_lock = threading.RLock()
self.flush_thread = FlushThread(self.max_batch_time,
self._flush_writes)
# Batch ids consist of a unique queue id used as prefix along with
# two numbers generated using the batch offset in the queue
def _batch_id(self, batch_offset):
oid = self.base.copy()
oid[-2] = batch_offset // 2**32
oid[-1] = batch_offset % 2**32
return np.ndarray.tobytes(oid)
def _flush_writes(self):
with self.flush_lock:
if not self.write_buffer:
return
batch_id = self._batch_id(self.write_batch_offset)
ray.worker.global_worker.put_object(
ray.ObjectID(batch_id), self.write_buffer)
logger.debug("[writer] Flush batch {} offset {} size {}".format(
self.write_batch_offset, self.write_item_offset,
len(self.write_buffer)))
self.write_buffer = []
self.write_batch_offset += 1
self._wait_for_reader()
self.last_flush_time = time.time()
def _wait_for_reader(self):
"""Checks for backpressure by the downstream reader."""
if self.max_size <= 0: # Unlimited queue
return
if self.write_item_offset - self.cached_remote_offset <= self.max_size:
return # Hasn't reached max size
remote_offset = internal_kv._internal_kv_get(self.read_ack_key)
if remote_offset is None:
# logger.debug("[writer] Waiting for reader to start...")
while remote_offset is None:
time.sleep(0.01)
remote_offset = internal_kv._internal_kv_get(self.read_ack_key)
remote_offset = int(remote_offset)
if self.write_item_offset - remote_offset > self.max_size:
logger.debug(
"[writer] Waiting for reader to catch up {} to {} - {}".format(
remote_offset, self.write_item_offset, self.max_size))
while self.write_item_offset - remote_offset > self.max_size:
time.sleep(0.01)
remote_offset = int(
internal_kv._internal_kv_get(self.read_ack_key))
self.cached_remote_offset = remote_offset
def _read_next_batch(self):
while (self.prefetch_batch_offset <
self.read_batch_offset + self.prefetch_depth):
plasma_prefetch(self._batch_id(self.prefetch_batch_offset))
self.prefetch_batch_offset += 1
self.read_buffer = plasma_get(self._batch_id(self.read_batch_offset))
self.read_batch_offset += 1
logger.debug("[reader] Fetched batch {} offset {} size {}".format(
self.read_batch_offset, self.read_item_offset,
len(self.read_buffer)))
self._ack_reads(self.read_item_offset + len(self.read_buffer))
# Reader acks the key it reads so that writer knows reader's offset.
# This is to cap queue size and simulate backpressure
def _ack_reads(self, offset):
if self.max_size > 0:
internal_kv._internal_kv_put(
self.read_ack_key, offset, overwrite=True)
def put_next(self, item):
with self.flush_lock:
if self.background_flush and not self.flush_thread.is_alive():
logger.debug("[writer] Starting batch flush thread")
self.flush_thread.start()
self.write_buffer.append(item)
self.write_item_offset += 1
if not self.last_flush_time:
self.last_flush_time = time.time()
delay = time.time() - self.last_flush_time
if (len(self.write_buffer) > self.max_batch_size
or delay > self.max_batch_time):
self._flush_writes()
def read_next(self):
if not self.read_buffer:
self._read_next_batch()
assert self.read_buffer
self.read_item_offset += 1
return self.read_buffer.pop(0)
@@ -0,0 +1,182 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import time
import ray
from ray.experimental.streaming.batched_queue import BatchedQueue
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--rounds", default=10, help="the number of experiment rounds")
parser.add_argument(
"--num-queues", default=1, help="the number of queues in the chain")
parser.add_argument(
"--queue-size", default=10000, help="the queue size in number of batches")
parser.add_argument(
"--batch-size", default=1000, help="the batch size in number of elements")
parser.add_argument(
"--flush-timeout", default=0.001, help="the timeout to flush a batch")
parser.add_argument(
"--prefetch-depth",
default=10,
help="the number of batches to prefetch from plasma")
parser.add_argument(
"--background-flush",
default=False,
help="whether to flush in the backrgound or not")
parser.add_argument(
"--max-throughput",
default="inf",
help="maximum read throughput (elements/s)")
@ray.remote
class Node(object):
"""An actor that reads from an input queue and writes to an output queue.
Attributes:
id (int): The id of the actor.
queue (BatchedQueue): The input queue.
out_queue (BatchedQueue): The output queue.
max_reads_per_second (int): The max read throughput (default: inf).
num_reads (int): Number of elements read.
num_writes (int): Number of elements written.
"""
def __init__(self,
id,
in_queue,
out_queue,
max_reads_per_second=float("inf")):
self.id = id
self.queue = in_queue
self.out_queue = out_queue
self.max_reads_per_second = max_reads_per_second
self.num_reads = 0
self.num_writes = 0
self.start = time.time()
def read_write_forever(self):
debug_log = "[actor {}] Reads throttled to {} reads/s"
log = ""
if self.out_queue is not None:
self.out_queue.enable_writes()
log += "[actor {}] Reads/Writes per second {}"
else: # It's just a reader
log += "[actor {}] Reads per second {}"
# Start spinning
expected_value = 0
while True:
start = time.time()
N = 100000
for _ in range(N):
x = self.queue.read_next()
assert x == expected_value, (x, expected_value)
expected_value += 1
self.num_reads += 1
if self.out_queue is not None:
self.out_queue.put_next(x)
self.num_writes += 1
while (self.num_reads / (time.time() - self.start) >
self.max_reads_per_second):
logger.debug(
debug_log.format(self.id, self.max_reads_per_second))
time.sleep(0.1)
logger.info(log.format(self.id, N / (time.time() - start)))
# Flush any remaining elements
if self.out_queue is not None:
self.out_queue._flush_writes()
def test_max_throughput(rounds,
max_queue_size,
max_batch_size,
batch_timeout,
prefetch_depth,
background_flush,
num_queues,
max_reads_per_second=float("inf")):
assert num_queues >= 1
first_queue = BatchedQueue(
max_size=max_queue_size,
max_batch_size=max_batch_size,
max_batch_time=batch_timeout,
prefetch_depth=prefetch_depth,
background_flush=background_flush)
previous_queue = first_queue
for i in range(num_queues):
# Construct the batched queue
in_queue = previous_queue
out_queue = None
if i < num_queues - 1:
out_queue = BatchedQueue(
max_size=max_queue_size,
max_batch_size=max_batch_size,
max_batch_time=batch_timeout,
prefetch_depth=prefetch_depth,
background_flush=background_flush)
node = Node.remote(i, in_queue, out_queue, max_reads_per_second)
node.read_write_forever.remote()
previous_queue = out_queue
value = 0
# Feed the chain
for round in range(rounds):
logger.info("Round {}".format(round))
N = 100000
start = time.time()
for i in range(N):
first_queue.put_next(value)
value += 1
log = "[writer] Puts per second {}"
logger.info(log.format(N / (time.time() - start)))
first_queue._flush_writes()
if __name__ == "__main__":
ray.init()
ray.register_custom_serializer(BatchedQueue, use_pickle=True)
args = parser.parse_args()
rounds = int(args.rounds)
max_queue_size = int(args.queue_size)
max_batch_size = int(args.batch_size)
batch_timeout = float(args.flush_timeout)
prefetch_depth = int(args.prefetch_depth)
background_flush = bool(args.background_flush)
num_queues = int(args.num_queues)
max_reads_per_second = float(args.max_throughput)
logger.info("== Parameters ==")
logger.info("Rounds: {}".format(rounds))
logger.info("Max queue size: {}".format(max_queue_size))
logger.info("Max batch size: {}".format(max_batch_size))
logger.info("Batch timeout: {}".format(batch_timeout))
logger.info("Prefetch depth: {}".format(prefetch_depth))
logger.info("Background flush: {}".format(background_flush))
logger.info("Max read throughput: {}".format(max_reads_per_second))
# Estimate the ideal throughput
value = 0
start = time.time()
for round in range(rounds):
N = 100000
for _ in range(N):
value += 1
logger.info("Ideal throughput: {}".format(value / (time.time() - start)))
logger.info("== Testing max throughput ==")
start = time.time()
test_max_throughput(rounds, max_queue_size, max_batch_size, batch_timeout,
prefetch_depth, background_flush, num_queues,
max_reads_per_second)
logger.info("Elapsed time: {}".format(time.time() - start))
@@ -0,0 +1,359 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import hashlib
import logging
import sys
from ray.experimental.streaming.operator import PStrategy
from ray.experimental.streaming.batched_queue import BatchedQueue
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Forward and broadcast stream partitioning strategies
forward_broadcast_strategies = [PStrategy.Forward, PStrategy.Broadcast]
# Used to choose output channel in case of hash-based shuffling
def _hash(value):
if isinstance(value, int):
return value
try:
return int(hashlib.sha1(value.encode("utf-8")).hexdigest(), 16)
except AttributeError:
return int(hashlib.sha1(value).hexdigest(), 16)
# A data channel is a batched queue between two
# operator instances in a streaming environment
class DataChannel(object):
"""A data channel for actor-to-actor communication.
Attributes:
env (Environment): The environment the channel belongs to.
src_operator_id (UUID): The id of the source operator of the channel.
dst_operator_id (UUID): The id of the destination operator of the
channel.
src_instance_id (int): The id of the source instance.
dst_instance_id (int): The id of the destination instance.
queue (BatchedQueue): The batched queue used for data movement.
"""
def __init__(self, env, src_operator_id, dst_operator_id, src_instance_id,
dst_instance_id):
self.env = env
self.src_operator_id = src_operator_id
self.dst_operator_id = dst_operator_id
self.src_instance_id = src_instance_id
self.dst_instance_id = dst_instance_id
self.queue = BatchedQueue(
max_size=self.env.config.queue_config.max_size,
max_batch_size=self.env.config.queue_config.max_batch_size,
max_batch_time=self.env.config.queue_config.max_batch_time,
prefetch_depth=self.env.config.queue_config.prefetch_depth,
background_flush=self.env.config.queue_config.background_flush)
def __repr__(self):
return "({},{},{},{})".format(
self.src_operator_id, self.dst_operator_id, self.src_instance_id,
self.dst_instance_id)
# Pulls and merges data from multiple input channels
class DataInput(object):
"""An input gate of an operator instance.
The input gate pulls records from all input channels in a round-robin
fashion.
Attributes:
input_channels (list): The list of input channels.
channel_index (int): The index of the next channel to pull from.
max_index (int): The number of input channels.
closed (list): A list of flags indicating whether an input channel
has been marked as 'closed'.
all_closed (bool): Denotes whether all input channels have been
closed (True) or not (False).
"""
def __init__(self, channels):
self.input_channels = channels
self.channel_index = 0
self.max_index = len(channels)
self.closed = [False] * len(
self.input_channels) # Tracks the channels that have been closed
self.all_closed = False
# Fetches records from input channels in a round-robin fashion
# TODO (john): Make sure the instance is not blocked on any of its input
# channels
# TODO (john): In case of input skew, it might be better to pull from
# the largest queue more often
def _pull(self):
while True:
if self.max_index == 0:
# TODO (john): We should detect this earlier
return None
# Channel to pull from
channel = self.input_channels[self.channel_index]
self.channel_index += 1
if self.channel_index == self.max_index: # Reset channel index
self.channel_index = 0
if self.closed[self.channel_index - 1]:
continue # Channel has been 'closed', check next
record = channel.queue.read_next()
logger.debug("Actor ({},{}) pulled '{}'.".format(
channel.src_operator_id, channel.src_instance_id, record))
if record is None:
# Mark channel as 'closed' and pull from the next open one
self.closed[self.channel_index - 1] = True
self.all_closed = True
for flag in self.closed:
if flag is False:
self.all_closed = False
break
if not self.all_closed:
continue
# Returns 'None' iff all input channels are 'closed'
return record
# Selects output channel(s) and pushes data
class DataOutput(object):
"""An output gate of an operator instance.
The output gate pushes records to output channels according to the
user-defined partitioning scheme.
Attributes:
partitioning_schemes (dict): A mapping from destination operator ids
to partitioning schemes (see: PScheme in operator.py).
forward_channels (list): A list of channels to forward records.
shuffle_channels (list(list)): A list of output channels to shuffle
records grouped by destination operator.
shuffle_key_channels (list(list)): A list of output channels to
shuffle records by a key grouped by destination operator.
shuffle_exists (bool): A flag indicating that there exists at least
one shuffle_channel.
shuffle_key_exists (bool): A flag indicating that there exists at
least one shuffle_key_channel.
"""
def __init__(self, channels, partitioning_schemes):
self.key_selector = None
self.round_robin_indexes = [0]
self.partitioning_schemes = partitioning_schemes
# Prepare output -- collect channels by type
self.forward_channels = [] # Forward and broadcast channels
slots = sum(1 for scheme in self.partitioning_schemes.values()
if scheme.strategy == PStrategy.RoundRobin)
self.round_robin_channels = [[]] * slots # RoundRobin channels
self.round_robin_indexes = [-1] * slots
slots = sum(1 for scheme in self.partitioning_schemes.values()
if scheme.strategy == PStrategy.Shuffle)
# Flag used to avoid hashing when there is no shuffling
self.shuffle_exists = slots > 0
self.shuffle_channels = [[]] * slots # Shuffle channels
slots = sum(1 for scheme in self.partitioning_schemes.values()
if scheme.strategy == PStrategy.ShuffleByKey)
# Flag used to avoid hashing when there is no shuffling by key
self.shuffle_key_exists = slots > 0
self.shuffle_key_channels = [[]] * slots # Shuffle by key channels
# Distinct shuffle destinations
shuffle_destinations = {}
# Distinct shuffle by key destinations
shuffle_by_key_destinations = {}
# Distinct round robin destinations
round_robin_destinations = {}
index_1 = 0
index_2 = 0
index_3 = 0
for channel in channels:
p_scheme = self.partitioning_schemes[channel.dst_operator_id]
strategy = p_scheme.strategy
if strategy in forward_broadcast_strategies:
self.forward_channels.append(channel)
elif strategy == PStrategy.Shuffle:
pos = shuffle_destinations.setdefault(channel.dst_operator_id,
index_1)
self.shuffle_channels[pos].append(channel)
if pos == index_1:
index_1 += 1
elif strategy == PStrategy.ShuffleByKey:
pos = shuffle_by_key_destinations.setdefault(
channel.dst_operator_id, index_2)
self.shuffle_key_channels[pos].append(channel)
if pos == index_2:
index_2 += 1
elif strategy == PStrategy.RoundRobin:
pos = round_robin_destinations.setdefault(
channel.dst_operator_id, index_3)
self.round_robin_channels[pos].append(channel)
if pos == index_3:
index_3 += 1
else: # TODO (john): Add support for other strategies
sys.exit("Unrecognized or unsupported partitioning strategy.")
# A KeyedDataStream can only be shuffled by key
assert not (self.shuffle_exists and self.shuffle_key_exists)
# Flushes any remaining records in the output channels
# 'close' indicates whether we should also 'close' the channel (True)
# by propagating 'None'
# or just flush the remaining records to plasma (False)
def _flush(self, close=False):
"""Flushes remaining output records in the output queues to plasma.
None is used as special type of record that is propagated from sources
to sink to notify that the end of data in a stream.
Attributes:
close (bool): A flag denoting whether the channel should be
also marked as 'closed' (True) or not (False) after flushing.
"""
for channel in self.forward_channels:
if close is True:
channel.queue.put_next(None)
channel.queue._flush_writes()
for channels in self.shuffle_channels:
for channel in channels:
if close is True:
channel.queue.put_next(None)
channel.queue._flush_writes()
for channels in self.shuffle_key_channels:
for channel in channels:
if close is True:
channel.queue.put_next(None)
channel.queue._flush_writes()
for channels in self.round_robin_channels:
for channel in channels:
if close is True:
channel.queue.put_next(None)
channel.queue._flush_writes()
# TODO (john): Add more channel types
# Returns all destination actor ids
def _destination_actor_ids(self):
destinations = []
for channel in self.forward_channels:
destinations.append((channel.dst_operator_id,
channel.dst_instance_id))
for channels in self.shuffle_channels:
for channel in channels:
destinations.append((channel.dst_operator_id,
channel.dst_instance_id))
for channels in self.shuffle_key_channels:
for channel in channels:
destinations.append((channel.dst_operator_id,
channel.dst_instance_id))
for channels in self.round_robin_channels:
for channel in channels:
destinations.append((channel.dst_operator_id,
channel.dst_instance_id))
# TODO (john): Add more channel types
return destinations
# Pushes the record to the output
# Each individual output queue flushes batches to plasma periodically
# based on 'batch_max_size' and 'batch_max_time'
def _push(self, record):
# Forward record
for channel in self.forward_channels:
logger.debug("[writer] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
# Forward record
index = 0
for channels in self.round_robin_channels:
self.round_robin_indexes[index] += 1
if self.round_robin_indexes[index] == len(channels):
self.round_robin_indexes[index] = 0 # Reset index
channel = channels[self.round_robin_indexes[index]]
logger.debug("[writer] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
index += 1
# Hash-based shuffling by key
if self.shuffle_key_exists:
key, _ = record
h = _hash(key)
for channels in self.shuffle_key_channels:
num_instances = len(channels) # Downstream instances
channel = channels[h % num_instances]
logger.debug(
"[key_shuffle] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
elif self.shuffle_exists: # Hash-based shuffling per destination
h = _hash(record)
for channels in self.shuffle_channels:
num_instances = len(channels) # Downstream instances
channel = channels[h % num_instances]
logger.debug("[shuffle] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
else: # TODO (john): Handle rescaling
pass
# Pushes a list of records to the output
# Each individual output queue flushes batches to plasma periodically
# based on 'batch_max_size' and 'batch_max_time'
def _push_all(self, records):
# Forward records
for record in records:
for channel in self.forward_channels:
logger.debug("[writer] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
# Hash-based shuffling by key per destination
if self.shuffle_key_exists:
for record in records:
key, _ = record
h = _hash(key)
for channels in self.shuffle_channels:
num_instances = len(channels) # Downstream instances
channel = channels[h % num_instances]
logger.debug(
"[key_shuffle] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
elif self.shuffle_exists: # Hash-based shuffling per destination
for record in records:
h = _hash(record)
for channels in self.shuffle_channels:
num_instances = len(channels) # Downstream instances
channel = channels[h % num_instances]
logger.debug(
"[shuffle] Push record '{}' to channel {}".format(
record, channel))
channel.queue.put_next(record)
else: # TODO (john): Handle rescaling
pass
# Batched queue configuration
class QueueConfig(object):
"""The configuration of a batched queue.
Attributes:
max_size (int): The maximum size of the queue in number of batches
(if exceeded, backpressure kicks in).
max_batch_size (int): The size of each batch in number of records.
max_batch_time (float): The flush timeout per batch.
prefetch_depth (int): The number of batches to prefetch from plasma.
background_flush (bool): Denotes whether a daemon flush thread should
be used (True) to flush batches to plasma.
"""
def __init__(self,
max_size=999999,
max_batch_size=99999,
max_batch_time=0.01,
prefetch_depth=10,
background_flush=False):
self.max_size = max_size
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.prefetch_depth = prefetch_depth
self.background_flush = background_flush
@@ -0,0 +1,8 @@
New York City
Berlin
London
Paris
United States
Germany
France
United Kingdom
@@ -0,0 +1,76 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import time
import ray
from ray.experimental.streaming.streaming import Environment
from ray.experimental.streaming.batched_queue import BatchedQueue
from ray.experimental.streaming.operator import OpType, PStrategy
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--input-file", required=True, help="the input text file")
# A class used to check attribute-based key selection
class Record(object):
def __init__(self, record):
k, _ = record
self.word = k
self.record = record
# Splits input line into words and outputs objects of type Record
# each one consisting of a key (word) and a tuple (word,1)
def splitter(line):
records = []
words = line.split()
for w in words:
records.append(Record((w, 1)))
return records
# Receives an object of type Record and returns the actual tuple
def as_tuple(record):
return record.record
if __name__ == "__main__":
# Get program parameters
args = parser.parse_args()
input_file = str(args.input_file)
ray.init()
ray.register_custom_serializer(Record, use_dict=True)
ray.register_custom_serializer(BatchedQueue, use_pickle=True)
ray.register_custom_serializer(OpType, use_pickle=True)
ray.register_custom_serializer(PStrategy, use_pickle=True)
# A Ray streaming environment with the default configuration
env = Environment()
env.set_parallelism(2) # Each operator will be executed by two actors
# 'key_by("word")' physically partitions the stream of records
# based on the hash value of the 'word' attribute (see Record class above)
# 'map(as_tuple)' maps a record of type Record into a tuple
# 'sum(1)' sums the 2nd element of the tuple, i.e. the word count
stream = env.read_text_file(input_file) \
.round_robin() \
.flat_map(splitter) \
.key_by("word") \
.map(as_tuple) \
.sum(1) \
.inspect(print) # Prints the content of the
# stream to stdout
start = time.time()
env_handle = env.execute() # Deploys and executes the dataflow
ray.get(env_handle) # Stay alive until execution finishes
end = time.time()
logger.info("Elapsed time: {} secs".format(end - start))
logger.debug("Output stream id: {}".format(stream.id))
@@ -0,0 +1,59 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import time
import ray
from ray.experimental.streaming.streaming import Environment
from ray.experimental.streaming.batched_queue import BatchedQueue
from ray.experimental.streaming.operator import OpType, PStrategy
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--input-file", required=True, help="the input text file")
# Test functions
def splitter(line):
return line.split()
def filter_fn(word):
if "f" in word:
return True
return False
if __name__ == "__main__":
args = parser.parse_args()
ray.init()
ray.register_custom_serializer(BatchedQueue, use_pickle=True)
ray.register_custom_serializer(OpType, use_pickle=True)
ray.register_custom_serializer(PStrategy, use_pickle=True)
# A Ray streaming environment with the default configuration
env = Environment()
# Stream represents the ouput of the filter and
# can be forked into other dataflows
stream = env.read_text_file(args.input_file) \
.shuffle() \
.flat_map(splitter) \
.set_parallelism(4) \
.filter(filter_fn) \
.set_parallelism(2) \
.inspect(print) # Prints the contents of the
# stream to stdout
start = time.time()
env_handle = env.execute()
ray.get(env_handle) # Stay alive until execution finishes
end = time.time()
logger.info("Elapsed time: {} secs".format(end - start))
logger.debug("Output stream id: {}".format(stream.id))
@@ -0,0 +1,5 @@
This is
a test file
to test if example
works
fine
@@ -0,0 +1,113 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import time
import wikipedia
import ray
from ray.experimental.streaming.streaming import Environment
from ray.experimental.streaming.batched_queue import BatchedQueue
from ray.experimental.streaming.operator import OpType, PStrategy
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--titles-file",
required=True,
help="the file containing the wikipedia titles to lookup")
# A custom data source that reads articles from wikipedia
# Custom data sources need to implement a get_next() method
# that returns the next data element, in this case sentences
class Wikipedia(object):
def __init__(self, title_file):
# Titles in this file will be as queries
self.title_file = title_file
# TODO (john): Handle possible exception here
self.title_reader = iter(list(open(self.title_file, "r").readlines()))
self.done = False
self.article_done = True
self.sentences = iter([])
# Returns next sentence from a wikipedia article
def get_next(self):
if self.done:
return None # Source exhausted
while True:
if self.article_done:
try: # Try next title
next_title = next(self.title_reader)
except StopIteration:
self.done = True # Source exhausted
return None
# Get next article
logger.debug("Next article: {}".format(next_title))
article = wikipedia.page(next_title).content
# Split article in sentences
self.sentences = iter(article.split("."))
self.article_done = False
try: # Try next sentence
sentence = next(self.sentences)
logger.debug("Next sentence: {}".format(sentence))
return sentence
except StopIteration:
self.article_done = True
# Splits input line into words and
# outputs records of the form (word,1)
def splitter(line):
records = []
words = line.split()
for w in words:
records.append((w, 1))
return records
# Returns the first attribute of a tuple
def key_selector(tuple):
return tuple[0]
# Returns the second attribute of a tuple
def attribute_selector(tuple):
return tuple[1]
if __name__ == "__main__":
# Get program parameters
args = parser.parse_args()
titles_file = str(args.titles_file)
ray.init()
ray.register_custom_serializer(BatchedQueue, use_pickle=True)
ray.register_custom_serializer(OpType, use_pickle=True)
ray.register_custom_serializer(PStrategy, use_pickle=True)
# A Ray streaming environment with the default configuration
env = Environment()
env.set_parallelism(2) # Each operator will be executed by two actors
# The following dataflow is a simple streaming wordcount
# with a rolling sum operator.
# It reads articles from wikipedia, splits them in words,
# shuffles words, and counts the occurences of each word.
stream = env.source(Wikipedia(titles_file)) \
.round_robin() \
.flat_map(splitter) \
.key_by(key_selector) \
.sum(attribute_selector) \
.inspect(print) # Prints the contents of the
# stream to stdout
start = time.time()
env_handle = env.execute() # Deploys and executes the dataflow
ray.get(env_handle) # Stay alive until execution finishes
end = time.time()
logger.info("Elapsed time: {} secs".format(end - start))
logger.debug("Output stream id: {}".format(stream.id))
@@ -0,0 +1,105 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import enum
import logging
logger = logging.getLogger(__name__)
logger.setLevel("DEBUG")
# Stream partitioning schemes
class PScheme(object):
def __init__(self, strategy, partition_fn=None):
self.strategy = strategy
self.partition_fn = partition_fn
def __repr__(self):
return "({},{})".format(self.strategy, self.partition_fn)
# Partitioning strategies
class PStrategy(enum.Enum):
Forward = 0 # Default
Shuffle = 1
Rescale = 2
RoundRobin = 3
Broadcast = 4
Custom = 5
ShuffleByKey = 6
# ...
# Operator types
class OpType(enum.Enum):
Source = 0
Map = 1
FlatMap = 2
Filter = 3
TimeWindow = 4
KeyBy = 5
Sink = 6
WindowJoin = 7
Inspect = 8
ReadTextFile = 9
Reduce = 10
Sum = 11
# ...
# A logical dataflow operator
class Operator(object):
def __init__(self,
id,
type,
name="",
logic=None,
num_instances=1,
other=None,
state_actor=None):
self.id = id
self.type = type
self.name = name
self.logic = logic # The operator's logic
self.num_instances = num_instances
# One partitioning strategy per downstream operator (default: forward)
self.partitioning_strategies = {}
self.other_args = other # Depends on the type of the operator
self.state_actor = state_actor # Actor to query state
# Sets the partitioning scheme for an output stream of the operator
def _set_partition_strategy(self,
stream_id,
partitioning_scheme,
dest_operator=None):
self.partitioning_strategies[stream_id] = (partitioning_scheme,
dest_operator)
# Retrieves the partitioning scheme for the given
# output stream of the operator
# Returns None is no strategy has been defined for the particular stream
def _get_partition_strategy(self, stream_id):
return self.partitioning_strategies.get(stream_id)
# Cleans metatada from all partitioning strategies that lack a
# destination operator
# Valid entries are re-organized as
# 'destination operator id -> partitioning scheme'
# Should be called only after the logical dataflow has been constructed
def _clean(self):
strategies = {}
for _, v in self.partitioning_strategies.items():
strategy, destination_operator = v
if destination_operator is not None:
strategies.setdefault(destination_operator, strategy)
self.partitioning_strategies = strategies
def print(self):
log = "Operator<\nID = {}\nName = {}\nType = {}\n"
log += "Logic = {}\nNumber_of_Instances = {}\n"
log += "Partitioning_Scheme = {}\nOther_Args = {}>\n"
logger.debug(
log.format(self.id, self.name, self.type, self.logic,
self.num_instances, self.partitioning_strategies,
self.other_args))
@@ -0,0 +1,365 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import sys
import time
import types
import ray
logger = logging.getLogger(__name__)
logger.setLevel("DEBUG")
#
# Each Ray actor corresponds to an operator instance in the physical dataflow
# Actors communicate using batched queues as data channels (no standing TCP
# connections)
# Currently, batched queues are based on Eric's implementation (see:
# batched_queue.py)
def _identity(element):
return element
# TODO (john): Specify the interface of state keepers
class OperatorInstance(object):
"""A streaming operator instance.
Attributes:
instance_id (UUID): The id of the instance.
input (DataInput): The input gate that manages input channels of
the instance (see: DataInput in communication.py).
input (DataOutput): The output gate that manages output channels of
the instance (see: DataOutput in communication.py).
state_keepers (list): A list of actor handlers to query the state of
the operator instance.
"""
def __init__(self, instance_id, input_gate, output_gate,
state_keeper=None):
self.key_index = None # Index for key selection
self.key_attribute = None # Attribute name for key selection
self.instance_id = instance_id
self.input = input_gate
self.output = output_gate
# Handle(s) to one or more user-defined actors
# that can retrieve actor's state
self.state_keeper = state_keeper
# Enable writes
for channel in self.output.forward_channels:
channel.queue.enable_writes()
for channels in self.output.shuffle_channels:
for channel in channels:
channel.queue.enable_writes()
for channels in self.output.shuffle_key_channels:
for channel in channels:
channel.queue.enable_writes()
for channels in self.output.round_robin_channels:
for channel in channels:
channel.queue.enable_writes()
# TODO (john): Add more channel types here
# Registers actor's handle so that the actor can schedule itself
def register_handle(self, actor_handle):
self.this_actor = actor_handle
# Used for index-based key extraction, e.g. for tuples
def index_based_selector(self, record):
return record[self.key_index]
# Used for attribute-based key extraction, e.g. for classes
def attribute_based_selector(self, record):
return vars(record)[self.key_attribute]
# Starts the actor
def start(self):
pass
# A source actor that reads a text file line by line
@ray.remote
class ReadTextFile(OperatorInstance):
"""A source operator instance that reads a text file line by line.
Attributes:
filepath (string): The path to the input file.
"""
def __init__(self,
instance_id,
operator_metadata,
input_gate,
output_gate,
state_keepers=None):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate,
state_keepers)
self.filepath = operator_metadata.other_args
# TODO (john): Handle possible exception here
self.reader = open(self.filepath, "r")
# Read input file line by line
def start(self):
while True:
record = self.reader.readline()
# Reader returns empty string ('') on EOF
if not record:
# Flush any remaining records to plasma and close the file
self.output._flush(close=True)
self.reader.close()
return
self.output._push(
record[:-1]) # Push after removing newline characters
# Map actor
@ray.remote
class Map(OperatorInstance):
"""A map operator instance that applies a user-defined
stream transformation.
A map produces exactly one output record for each record in
the input stream.
Attributes:
map_fn (function): The user-defined function.
"""
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate)
self.map_fn = operator_metadata.logic
# Applies the mapper each record of the input stream(s)
# and pushes resulting records to the output stream(s)
def start(self):
start = time.time()
elements = 0
while True:
record = self.input._pull()
if record is None:
self.output._flush(close=True)
logger.debug("[map {}] read/writes per second: {}".format(
self.instance_id, elements / (time.time() - start)))
return
self.output._push(self.map_fn(record))
elements += 1
# Flatmap actor
@ray.remote
class FlatMap(OperatorInstance):
"""A map operator instance that applies a user-defined
stream transformation.
A flatmap produces one or more output records for each record in
the input stream.
Attributes:
flatmap_fn (function): The user-defined function.
"""
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate)
self.flatmap_fn = operator_metadata.logic
# Applies the splitter to the records of the input stream(s)
# and pushes resulting records to the output stream(s)
def start(self):
while True:
record = self.input._pull()
if record is None:
self.output._flush(close=True)
return
self.output._push_all(self.flatmap_fn(record))
# Filter actor
@ray.remote
class Filter(OperatorInstance):
"""A filter operator instance that applies a user-defined filter to
each record of the stream.
Output records are those that pass the filter, i.e. those for which
the filter function returns True.
Attributes:
filter_fn (function): The user-defined boolean function.
"""
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate)
self.filter_fn = operator_metadata.logic
# Applies the filter to the records of the input stream(s)
# and pushes resulting records to the output stream(s)
def start(self):
while True:
record = self.input._pull()
if record is None: # Close channel and return
self.output._flush(close=True)
return
if self.filter_fn(record):
self.output._push(record)
# Inspect actor
@ray.remote
class Inspect(OperatorInstance):
"""A inspect operator instance that inspects the content of the stream.
Inspect is useful for printing the records in the stream.
Attributes:
inspect_fn (function): The user-defined inspect logic.
"""
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate)
self.inspect_fn = operator_metadata.logic
# Applies the inspect logic (e.g. print) to the records of
# the input stream(s)
# and leaves stream unaffected by simply pushing the records to
# the output stream(s)
while True:
record = self.input._pull()
if record is None:
self.output._flush(close=True)
return
self.output._push(record)
self.inspect_fn(record)
# Reduce actor
@ray.remote
class Reduce(OperatorInstance):
"""A reduce operator instance that combines a new value for a key
with the last reduced one according to a user-defined logic.
Attributes:
reduce_fn (function): The user-defined reduce logic.
value_attribute (int): The index of the value to reduce
(assuming tuple records).
state (dict): A mapping from keys to values.
"""
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate,
operator_metadata.state_actor)
self.reduce_fn = operator_metadata.logic
# Set the attribute selector
self.attribute_selector = operator_metadata.other_args
if self.attribute_selector is None:
self.attribute_selector = _identity
elif isinstance(self.attribute_selector, int):
self.key_index = self.attribute_selector
self.attribute_selector = self.index_based_selector
elif isinstance(self.attribute_selector, str):
self.key_attribute = self.attribute_selector
self.attribute_selector = self.attribute_based_selector
elif not isinstance(self.attribute_selector, types.FunctionType):
sys.exit("Unrecognized or unsupported key selector.")
self.state = {} # key -> value
# Combines the input value for a key with the last reduced
# value for that key to produce a new value.
# Outputs the result as (key,new value)
def start(self):
while True:
record = self.input._pull()
if record is None:
self.output._flush(close=True)
del self.state
return
key, rest = record
new_value = self.attribute_selector(rest)
# TODO (john): Is there a way to update state with
# a single dictionary lookup?
try:
old_value = self.state[key]
new_value = self.reduce_fn(old_value, new_value)
self.state[key] = new_value
except KeyError: # Key does not exist in state
self.state.setdefault(key, new_value)
self.output._push((key, new_value))
# Returns the state of the actor
def get_state(self):
return self.state
@ray.remote
class KeyBy(OperatorInstance):
"""A key_by operator instance that physically partitions the
stream based on a key.
Attributes:
key_attribute (int): The index of the value to reduce
(assuming tuple records).
"""
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate)
# Set the key selector
self.key_selector = operator_metadata.other_args
if isinstance(self.key_selector, int):
self.key_index = self.key_selector
self.key_selector = self.index_based_selector
elif isinstance(self.key_selector, str):
self.key_attribute = self.key_selector
self.key_selector = self.attribute_based_selector
elif not isinstance(self.key_selector, types.FunctionType):
sys.exit("Unrecognized or unsupported key selector.")
# The actual partitioning is done by the output gate
def start(self):
while True:
record = self.input._pull()
if record is None:
self.output._flush(close=True)
return
key = self.key_selector(record)
self.output._push((key, record))
# A custom source actor
@ray.remote
class Source(OperatorInstance):
def __init__(self, instance_id, operator_metadata, input_gate,
output_gate):
OperatorInstance.__init__(self, instance_id, input_gate, output_gate)
# The user-defined source with a get_next() method
self.source = operator_metadata.other_args
# Starts the source by calling get_next() repeatedly
def start(self):
start = time.time()
elements = 0
while True:
next = self.source.get_next()
if next is None:
self.output._flush(close=True)
logger.debug("[writer {}] puts per second: {}".format(
self.instance_id, elements / (time.time() - start)))
return
self.output._push(next)
elements += 1
# TODO(john): Time window actor (uses system time)
@ray.remote
class TimeWindow(OperatorInstance):
def __init__(self, queue, width):
self.width = width # In milliseconds
def time_window(self):
while True:
pass
@@ -0,0 +1,668 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import sys
import uuid
import networkx as nx
from ray.experimental.streaming.communication import DataChannel, DataInput
from ray.experimental.streaming.communication import DataOutput, QueueConfig
from ray.experimental.streaming.operator import Operator, OpType
from ray.experimental.streaming.operator import PScheme, PStrategy
import ray.experimental.streaming.operator_instance as operator_instance
logger = logging.getLogger(__name__)
logger.setLevel("INFO")
# Generates UUIDs
def _generate_uuid():
return uuid.uuid4()
# Rolling sum's logic
def _sum(value_1, value_2):
return value_1 + value_2
# Partitioning strategies that require all-to-all instance communication
all_to_all_strategies = [
PStrategy.Shuffle, PStrategy.ShuffleByKey, PStrategy.Broadcast,
PStrategy.RoundRobin
]
# Environment configuration
class Config(object):
"""Environment configuration.
This class includes all information about the configuration of the
streaming environment.
Attributes:
queue_config (QueueConfig): Batched Queue configuration
(see: communication.py)
A batched queue configuration includes the max queue size,
the size of each batch (in number of elements), the batch flush
timeout, and the number of batches to prefetch from plasma
parallelism (int): The number of isntances (actors) for each logical
dataflow operator (default: 1)
"""
def __init__(self, parallelism=1):
self.queue_config = QueueConfig()
self.parallelism = parallelism
# ...
# The execution environment for a streaming job
class Environment(object):
"""A streaming environment.
This class is responsible for constructing the logical and the
physical dataflow.
Attributes:
logical_topo (DiGraph): The user-defined logical topology in
NetworkX DiGRaph format.
(See: https://networkx.github.io)
physical_topo (DiGraph): The physical topology in NetworkX
DiGRaph format. The physical dataflow is constructed by the
environment based on logical_topo.
operators (dict): A mapping from operator ids to operator metadata
(See: Operator in operator.py).
config (Config): The environment's configuration.
topo_cleaned (bool): A flag that indicates whether the logical
topology is garbage collected (True) or not (False).
actor_handles (list): A list of all Ray actor handles that execute
the streaming dataflow.
"""
def __init__(self, config=Config()):
self.logical_topo = nx.DiGraph() # DAG
self.physical_topo = nx.DiGraph() # DAG
self.operators = {} # operator id --> operator object
self.config = config # Environment's configuration
self.topo_cleaned = False
# Handles to all actors in the physical dataflow
self.actor_handles = []
# Constructs and deploys a Ray actor of a specific type
# TODO (john): Actor placement information should be specified in
# the environment's configuration
def __generate_actor(self, instance_id, operator, input, output):
"""Generates an actor that will execute a particular instance of
the logical operator
Attributes:
instance_id (UUID): The id of the instance the actor will execute.
operator (Operator): The metadata of the logical operator.
input (DataInput): The input gate that manages input channels of
the instance (see: DataInput in communication.py).
input (DataOutput): The output gate that manages output channels
of the instance (see: DataOutput in communication.py).
"""
actor_id = (operator.id, instance_id)
# Record the physical dataflow graph (for debugging purposes)
self.__add_channel(actor_id, input, output)
# Select actor to construct
if operator.type == OpType.Source:
source = operator_instance.Source.remote(actor_id, operator, input,
output)
source.register_handle.remote(source)
return source.start.remote()
elif operator.type == OpType.Map:
map = operator_instance.Map.remote(actor_id, operator, input,
output)
map.register_handle.remote(map)
return map.start.remote()
elif operator.type == OpType.FlatMap:
flatmap = operator_instance.FlatMap.remote(actor_id, operator,
input, output)
flatmap.register_handle.remote(flatmap)
return flatmap.start.remote()
elif operator.type == OpType.Filter:
filter = operator_instance.Filter.remote(actor_id, operator, input,
output)
filter.register_handle.remote(filter)
return filter.start.remote()
elif operator.type == OpType.Reduce:
reduce = operator_instance.Reduce.remote(actor_id, operator, input,
output)
reduce.register_handle.remote(reduce)
return reduce.start.remote()
elif operator.type == OpType.TimeWindow:
pass
elif operator.type == OpType.KeyBy:
keyby = operator_instance.KeyBy.remote(actor_id, operator, input,
output)
keyby.register_handle.remote(keyby)
return keyby.start.remote()
elif operator.type == OpType.Sum:
sum = operator_instance.Reduce.remote(actor_id, operator, input,
output)
# Register target handle at state actor
state_actor = operator.state_actor
if state_actor is not None:
state_actor.register_target.remote(sum)
# Register own handle
sum.register_handle.remote(sum)
return sum.start.remote()
elif operator.type == OpType.Sink:
pass
elif operator.type == OpType.Inspect:
inspect = operator_instance.Inspect.remote(actor_id, operator,
input, output)
inspect.register_handle.remote(inspect)
return inspect.start.remote()
elif operator.type == OpType.ReadTextFile:
# TODO (john): Colocate the source with the input file
read = operator_instance.ReadTextFile.remote(
actor_id, operator, input, output)
read.register_handle.remote(read)
return read.start.remote()
else: # TODO (john): Add support for other types of operators
sys.exit("Unrecognized or unsupported {} operator type.".format(
operator.type))
# Constructs and deploys a Ray actor for each instance of
# the given operator
def __generate_actors(self, operator, upstream_channels,
downstream_channels):
"""Generates one actor for each instance of the given logical
operator.
Attributes:
operator (Operator): The logical operator metadata.
upstream_channels (list): A list of all upstream channels for
all instances of the operator.
downstream_channels (list): A list of all downstream channels
for all instances of the operator.
"""
num_instances = operator.num_instances
logger.info("Generating {} actors of type {}...".format(
num_instances, operator.type))
in_channels = upstream_channels.pop(
operator.id) if upstream_channels else []
handles = []
for i in range(num_instances):
# Collect input and output channels for the particular instance
ip = [
channel for channel in in_channels
if channel.dst_instance_id == i
] if in_channels else []
op = [
channel for channels_list in downstream_channels.values()
for channel in channels_list if channel.src_instance_id == i
]
log = "Constructed {} input and {} output channels "
log += "for the {}-th instance of the {} operator."
logger.debug(log.format(len(ip), len(op), i, operator.type))
input_gate = DataInput(ip)
output_gate = DataOutput(op, operator.partitioning_strategies)
handle = self.__generate_actor(i, operator, input_gate,
output_gate)
if handle:
handles.append(handle)
return handles
# Adds a channel/edge to the physical dataflow graph
def __add_channel(self, actor_id, input, output):
for dest_actor_id in output._destination_actor_ids():
self.physical_topo.add_edge(actor_id, dest_actor_id)
# Generates all required data channels between an operator
# and its downstream operators
def _generate_channels(self, operator):
"""Generates all output data channels
(see: DataChannel in communication.py) for all instances of
the given logical operator.
The function constructs one data channel for each pair of
communicating operator instances (instance_1,instance_2),
where instance_1 is an instance of the given operator and instance_2
is an instance of a direct downstream operator.
The number of total channels generated depends on the partitioning
strategy specified by the user.
"""
channels = {} # destination operator id -> channels
strategies = operator.partitioning_strategies
for dst_operator, p_scheme in strategies.items():
num_dest_instances = self.operators[dst_operator].num_instances
entry = channels.setdefault(dst_operator, [])
if p_scheme.strategy == PStrategy.Forward:
for i in range(operator.num_instances):
# ID of destination instance to connect
id = i % num_dest_instances
channel = DataChannel(self, operator.id, dst_operator, i,
id)
entry.append(channel)
elif p_scheme.strategy in all_to_all_strategies:
for i in range(operator.num_instances):
for j in range(num_dest_instances):
channel = DataChannel(self, operator.id, dst_operator,
i, j)
entry.append(channel)
else:
# TODO (john): Add support for other partitioning strategies
sys.exit("Unrecognized or unsupported partitioning strategy.")
return channels
# An edge denotes a flow of data between logical operators
# and may correspond to multiple data channels in the physical dataflow
def _add_edge(self, source, destination):
self.logical_topo.add_edge(source, destination)
# Cleans the logical dataflow graph to construct and
# deploy the physical dataflow
def _collect_garbage(self):
if self.topo_cleaned is True:
return
for node in self.logical_topo:
self.operators[node]._clean()
self.topo_cleaned = True
# Sets the level of parallelism for a registered operator
# Overwrites the environment parallelism (if set)
def _set_parallelism(self, operator_id, level_of_parallelism):
self.operators[operator_id].num_instances = level_of_parallelism
# Sets the same level of parallelism for all operators in the environment
def set_parallelism(self, parallelism):
self.config.parallelism = parallelism
# Sets batched queue configuration for the environment
def set_queue_config(self, queue_config):
self.config.queue_config = queue_config
# Creates and registers a user-defined data source
# TODO (john): There should be different types of sources, e.g. sources
# reading from Kafka, text files, etc.
# TODO (john): Handle case where environment parallelism is set
def source(self, source):
source_id = _generate_uuid()
source_stream = DataStream(self, source_id)
self.operators[source_id] = Operator(
source_id, OpType.Source, "Source", other=source)
return source_stream
# Creates and registers a new data source that reads a
# text file line by line
# TODO (john): There should be different types of sources,
# e.g. sources reading from Kafka, text files, etc.
# TODO (john): Handle case where environment parallelism is set
def read_text_file(self, filepath):
source_id = _generate_uuid()
source_stream = DataStream(self, source_id)
self.operators[source_id] = Operator(
source_id, OpType.ReadTextFile, "Read Text File", other=filepath)
return source_stream
# Constructs and deploys the physical dataflow
def execute(self):
"""Deploys and executes the physical dataflow."""
self._collect_garbage() # Make sure everything is clean
# TODO (john): Check if dataflow has any 'logical inconsistencies'
# For example, if there is a forward partitioning strategy but
# the number of downstream instances is larger than the number of
# upstream instances, some of the downstream instances will not be
# used at all
# Each operator instance is implemented as a Ray actor
# Actors are deployed in topological order, as we traverse the
# logical dataflow from sources to sinks. At each step, data
# producers wait for acknowledge from consumers before starting
# generating data.
upstream_channels = {}
for node in nx.topological_sort(self.logical_topo):
operator = self.operators[node]
# Generate downstream data channels
downstream_channels = self._generate_channels(operator)
# Instantiate Ray actors
handles = self.__generate_actors(operator, upstream_channels,
downstream_channels)
if handles:
self.actor_handles.extend(handles)
upstream_channels.update(downstream_channels)
logger.debug("Running...")
return self.actor_handles
# Prints the logical dataflow graph
def print_logical_graph(self):
self._collect_garbage()
logger.info("==================================")
logger.info("======Logical Dataflow Graph======")
logger.info("==================================")
# Print operators in topological order
for node in nx.topological_sort(self.logical_topo):
downstream_neighbors = list(self.logical_topo.neighbors(node))
logger.info("======Current Operator======")
operator = self.operators[node]
operator.print()
logger.info("======Downstream Operators======")
if len(downstream_neighbors) == 0:
logger.info("None\n")
for downstream_node in downstream_neighbors:
self.operators[downstream_node].print()
# Prints the physical dataflow graph
def print_physical_graph(self):
logger.info("===================================")
logger.info("======Physical Dataflow Graph======")
logger.info("===================================")
# Print all data channels between operator instances
log = "(Source Operator ID,Source Operator Name,Source Instance ID)"
log += " --> "
log += "(Destination Operator ID,Destination Operator Name,"
log += "Destination Instance ID)"
logger.info(log)
for src_actor_id, dst_actor_id in self.physical_topo.edges:
src_operator_id, src_instance_id = src_actor_id
dst_operator_id, dst_instance_id = dst_actor_id
logger.info("({},{},{}) --> ({},{},{})".format(
src_operator_id, self.operators[src_operator_id].name,
src_instance_id, dst_operator_id,
self.operators[dst_operator_id].name, dst_instance_id))
# TODO (john): We also need KeyedDataStream and WindowedDataStream as
# subclasses of DataStream to prevent ill-defined logical dataflows
# A DataStream corresponds to an edge in the logical dataflow
class DataStream(object):
"""A data stream.
This class contains all information about a logical stream, i.e. an edge
in the logical topology. It is the main class exposed to the user.
Attributes:
id (UUID): The id of the stream
env (Environment): The environment the stream belongs to.
src_operator_id (UUID): The id of the source operator of the stream.
dst_operator_id (UUID): The id of the destination operator of the
stream.
is_partitioned (bool): Denotes if there is a partitioning strategy
(e.g. shuffle) for the stream or not (default stategy: Forward).
"""
def __init__(self,
environment,
source_id=None,
dest_id=None,
is_partitioned=False):
self.id = _generate_uuid()
self.env = environment
self.src_operator_id = source_id
self.dst_operator_id = dest_id
# True if a partitioning strategy for this stream exists,
# false otherwise
self.is_partitioned = is_partitioned
# Generates a new stream after a data transformation is applied
def __expand(self):
stream = DataStream(self.env)
assert (self.dst_operator_id is not None)
stream.src_operator_id = self.dst_operator_id
stream.dst_operator_id = None
return stream
# Assigns the partitioning strategy to a new 'open-ended' stream
# and returns the stream. At this point, the partitioning strategy
# is not associated with any destination operator. We expect this to
# be done later, as we continue assembling the dataflow graph
def __partition(self, strategy, partition_fn=None):
scheme = PScheme(strategy, partition_fn)
source_operator = self.env.operators[self.src_operator_id]
new_stream = DataStream(
self.env, source_id=source_operator.id, is_partitioned=True)
source_operator._set_partition_strategy(new_stream.id, scheme)
return new_stream
# Registers the operator to the environment and returns a new
# 'open-ended' stream. The registered operator serves as the destination
# of the previously 'open' stream
def __register(self, operator):
"""Registers the given logical operator to the environment and
connects it to its upstream operator (if any).
A call to this function adds a new edge to the logical topology.
Attributes:
operator (Operator): The metadata of the logical operator.
"""
self.env.operators[operator.id] = operator
self.dst_operator_id = operator.id
logger.debug("Adding new dataflow edge ({},{}) --> ({},{})".format(
self.src_operator_id,
self.env.operators[self.src_operator_id].name,
self.dst_operator_id,
self.env.operators[self.dst_operator_id].name))
# Update logical dataflow graphs
self.env._add_edge(self.src_operator_id, self.dst_operator_id)
# Keep track of the partitioning strategy and the destination operator
src_operator = self.env.operators[self.src_operator_id]
if self.is_partitioned is True:
partitioning, _ = src_operator._get_partition_strategy(self.id)
src_operator._set_partition_strategy(_generate_uuid(),
partitioning, operator.id)
elif src_operator.type == OpType.KeyBy:
# Set the output partitioning strategy to shuffle by key
partitioning = PScheme(PStrategy.ShuffleByKey)
src_operator._set_partition_strategy(_generate_uuid(),
partitioning, operator.id)
else: # No partitioning strategy has been defined - set default
partitioning = PScheme(PStrategy.Forward)
src_operator._set_partition_strategy(_generate_uuid(),
partitioning, operator.id)
return self.__expand()
# Sets the level of parallelism for an operator, i.e. its total
# number of instances. Each operator instance corresponds to an actor
# in the physical dataflow
def set_parallelism(self, num_instances):
"""Sets the number of instances for the source operator of the stream.
Attributes:
num_instances (int): The level of parallelism for the source
operator of the stream.
"""
assert (num_instances > 0)
self.env._set_parallelism(self.src_operator_id, num_instances)
return self
# Stream Partitioning Strategies #
# TODO (john): Currently, only forward (default), shuffle,
# and broadcast are supported
# Hash-based record shuffling
def shuffle(self):
"""Registers a shuffling partitioning strategy for the stream."""
return self.__partition(PStrategy.Shuffle)
# Broadcasts each record to all downstream instances
def broadcast(self):
"""Registers a broadcast partitioning strategy for the stream."""
return self.__partition(PStrategy.Broadcast)
# Rescales load to downstream instances
def rescale(self):
"""Registers a rescale partitioning strategy for the stream.
Same as Flink's rescale (see: https://ci.apache.org/projects/flink/
flink-docs-stable/dev/stream/operators/#physical-partitioning).
"""
return self.__partition(PStrategy.Rescale)
# Round-robin partitioning
def round_robin(self):
"""Registers a round-robin partitioning strategy for the stream."""
return self.__partition(PStrategy.RoundRobin)
# User-defined partitioning
def partition(self, partition_fn):
"""Registers a user-defined partitioning strategy for the stream.
Attributes:
partition_fn (function): The user-defined partitioning function.
"""
return self.__partition(PStrategy.Custom, partition_fn)
# Data Trasnformations #
# TODO (john): Expand set of supported operators.
# TODO (john): To support event-time windows we need a mechanism for
# generating and processing watermarks
# Registers map operator to the environment
def map(self, map_fn, name="Map"):
"""Applies a map operator to the stream.
Attributes:
map_fn (function): The user-defined logic of the map.
"""
op = Operator(
_generate_uuid(),
OpType.Map,
name,
map_fn,
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers flatmap operator to the environment
def flat_map(self, flatmap_fn):
"""Applies a flatmap operator to the stream.
Attributes:
flatmap_fn (function): The user-defined logic of the flatmap
(e.g. split()).
"""
op = Operator(
_generate_uuid(),
OpType.FlatMap,
"FlatMap",
flatmap_fn,
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers keyBy operator to the environment
# TODO (john): This should returned a KeyedDataStream
def key_by(self, key_selector):
"""Applies a key_by operator to the stream.
Attributes:
key_attribute_index (int): The index of the key attributed
(assuming tuple records).
"""
op = Operator(
_generate_uuid(),
OpType.KeyBy,
"KeyBy",
other=key_selector,
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers Reduce operator to the environment
def reduce(self, reduce_fn):
"""Applies a rolling sum operator to the stream.
Attributes:
sum_attribute_index (int): The index of the attribute to sum
(assuming tuple records).
"""
op = Operator(
_generate_uuid(),
OpType.Reduce,
"Sum",
reduce_fn,
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers Sum operator to the environment
def sum(self, attribute_selector, state_keeper=None):
"""Applies a rolling sum operator to the stream.
Attributes:
sum_attribute_index (int): The index of the attribute to sum
(assuming tuple records).
"""
op = Operator(
_generate_uuid(),
OpType.Sum,
"Sum",
_sum,
other=attribute_selector,
state_actor=state_keeper,
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers window operator to the environment.
# This is a system time window
# TODO (john): This should return a WindowedDataStream
def time_window(self, window_width_ms):
"""Applies a system time window to the stream.
Attributes:
window_width_ms (int): The length of the window in ms.
"""
op = Operator(
_generate_uuid(),
OpType.TimeWindow,
"TimeWindow",
num_instances=self.env.config.parallelism,
other=window_width_ms)
return self.__register(op)
# Registers filter operator to the environment
def filter(self, filter_fn):
"""Applies a filter to the stream.
Attributes:
filter_fn (function): The user-defined filter function.
"""
op = Operator(
_generate_uuid(),
OpType.Filter,
"Filter",
filter_fn,
num_instances=self.env.config.parallelism)
return self.__register(op)
# TODO (john): Registers window join operator to the environment
def window_join(self, other_stream, join_attribute, window_width):
op = Operator(
_generate_uuid(),
OpType.WindowJoin,
"WindowJoin",
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers inspect operator to the environment
def inspect(self, inspect_logic):
"""Inspects the content of the stream.
Attributes:
inspect_logic (function): The user-defined inspect function.
"""
op = Operator(
_generate_uuid(),
OpType.Inspect,
"Inspect",
inspect_logic,
num_instances=self.env.config.parallelism)
return self.__register(op)
# Registers sink operator to the environment
# TODO (john): A sink now just drops records but it should be able to
# export data to other systems
def sink(self):
"""Closes the stream with a sink operator."""
op = Operator(
_generate_uuid(),
OpType.Sink,
"Sink",
num_instances=self.env.config.parallelism)
return self.__register(op)
+66
View File
@@ -0,0 +1,66 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import time
import ray
from ray.experimental.streaming.batched_queue import BatchedQueue
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@ray.remote
class Reader(object):
def __init__(self, queue):
self.queue = queue
self.num_reads = 0
self.start = time.time()
def read(self, read_slowly):
expected_value = 0
for _ in range(1000):
x = self.queue.read_next()
assert x == expected_value, (x, expected_value)
expected_value += 1
self.num_reads += 1
if read_slowly:
time.sleep(0.001)
def test_batched_queue(ray_start):
# Batched queue parameters
max_queue_size = 10000 # Max number of batches in queue
max_batch_size = 1000 # Max number of elements per batch
batch_timeout = 0.001 # 1ms flush timeout
prefetch_depth = 10 # Number of batches to prefetch from plasma
background_flush = False # Don't use daemon thread for flushing
# Two tests: one with a big queue and slow reader, and
# a second one with a small queue and a faster reader
for read_slowly in [True, False]:
# Construct the batched queue
queue = BatchedQueue(
max_size=max_queue_size,
max_batch_size=max_batch_size,
max_batch_time=batch_timeout,
prefetch_depth=prefetch_depth,
background_flush=background_flush)
# Create and start the reader
reader = Reader.remote(queue)
object_id = reader.read.remote(read_slowly=read_slowly)
value = 0
for _ in range(1000):
queue.put_next(value)
value += 1
queue._flush_writes()
ray.get(object_id)
# Test once more with a very small queue size and a faster reader
max_queue_size = 10
+204
View File
@@ -0,0 +1,204 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.experimental.streaming.streaming import Environment
from ray.experimental.streaming.operator import OpType, PStrategy
def test_parallelism():
"""Tests operator parallelism."""
env = Environment()
# Try setting a common parallelism for all operators
env.set_parallelism(2)
stream = env.source(None).map(None).filter(None).flat_map(None)
env._collect_garbage()
for operator in env.operators.values():
if operator.type == OpType.Source:
# TODO (john): Currently each source has only one instance
assert operator.num_instances == 1, (operator.num_instances, 1)
else:
assert operator.num_instances == 2, (operator.num_instances, 2)
# Check again after adding an operator with different parallelism
stream.map(None, "Map1").shuffle().set_parallelism(3).map(
None, "Map2").set_parallelism(4)
env._collect_garbage()
for operator in env.operators.values():
if operator.type == OpType.Source:
assert operator.num_instances == 1, (operator.num_instances, 1)
elif operator.name != "Map1" and operator.name != "Map2":
assert operator.num_instances == 2, (operator.num_instances, 2)
elif operator.name != "Map2":
assert operator.num_instances == 3, (operator.num_instances, 3)
else:
assert operator.num_instances == 4, (operator.num_instances, 4)
def test_partitioning():
"""Tests stream partitioning."""
env = Environment()
# Try defining multiple partitioning strategies for the same stream
_ = env.source(None).shuffle().rescale().broadcast().map(
None).broadcast().shuffle()
env._collect_garbage()
for operator in env.operators.values():
p_schemes = operator.partitioning_strategies
for scheme in p_schemes.values():
# Only last defined strategy should be kept
if operator.type == OpType.Source:
assert scheme.strategy == PStrategy.Broadcast, (
scheme.strategy, PStrategy.Broadcast)
else:
assert scheme.strategy == PStrategy.Shuffle, (
scheme.strategy, PStrategy.Shuffle)
def test_forking():
"""Tests stream forking."""
env = Environment()
# Try forking a stream
stream = env.source(None).map(None).set_parallelism(2)
# First branch with a shuffle partitioning strategy
_ = stream.shuffle().key_by(0).sum(1)
# Second branch with the default partitioning strategy
_ = stream.key_by(1).sum(2)
env._collect_garbage()
# Operator ids
source_id = None
map_id = None
keyby1_id = None
keyby2_id = None
sum1_id = None
sum2_id = None
# Collect ids
for id, operator in env.operators.items():
if operator.type == OpType.Source:
source_id = id
elif operator.type == OpType.Map:
map_id = id
elif operator.type == OpType.KeyBy:
if operator.other_args == 0:
keyby1_id = id
else:
assert operator.other_args == 1, (operator.other_args, 1)
keyby2_id = id
elif operator.type == OpType.Sum:
if operator.other_args == 1:
sum1_id = id
else:
assert operator.other_args == 2, (operator.other_args, 2)
sum2_id = id
# Check generated streams and their partitioning
for source, destination in env.logical_topo.edges:
operator = env.operators[source]
if source == source_id:
assert destination == map_id, (destination, map_id)
elif source == map_id:
p_scheme = operator.partitioning_strategies[destination]
strategy = p_scheme.strategy
key_index = env.operators[destination].other_args
if key_index == 0: # This must be the first branch
assert strategy == PStrategy.Shuffle, (strategy,
PStrategy.Shuffle)
assert destination == keyby1_id, (destination, keyby1_id)
else: # This must be the second branch
assert key_index == 1, (key_index, 1)
assert strategy == PStrategy.Forward, (strategy,
PStrategy.Forward)
assert destination == keyby2_id, (destination, keyby2_id)
elif source == keyby1_id or source == keyby2_id:
p_scheme = operator.partitioning_strategies[destination]
strategy = p_scheme.strategy
key_index = env.operators[destination].other_args
if key_index == 1: # This must be the first branch
assert strategy == PStrategy.ShuffleByKey, (
strategy, PStrategy.ShuffleByKey)
assert destination == sum1_id, (destination, sum1_id)
else: # This must be the second branch
assert key_index == 2, (key_index, 2)
assert strategy == PStrategy.ShuffleByKey, (
strategy, PStrategy.ShuffleByKey)
assert destination == sum2_id, (destination, sum2_id)
else: # This must be a sum operator
assert operator.type == OpType.Sum, (operator.type, OpType.Sum)
def _test_shuffle_channels():
"""Tests shuffling connectivity."""
env = Environment()
# Try defining a shuffle
_ = env.source(None).shuffle().map(None).set_parallelism(4)
expected = [(0, 0), (0, 1), (0, 2), (0, 3)]
_test_channels(env, expected)
def _test_forward_channels():
"""Tests forward connectivity."""
env = Environment()
# Try the default partitioning strategy
_ = env.source(None).set_parallelism(4).map(None).set_parallelism(2)
expected = [(0, 0), (1, 1), (2, 0), (3, 1)]
_test_channels(env, expected)
def _test_broadcast_channels():
"""Tests broadcast connectivity."""
env = Environment()
# Try broadcasting
_ = env.source(None).set_parallelism(4).broadcast().map(
None).set_parallelism(2)
expected = [(0, 0), (0, 1), (1, 0), (1, 1), (2, 0), (2, 1), (3, 0), (3, 1)]
_test_channels(env, expected)
def _test_round_robin_channels():
"""Tests round-robin connectivity."""
env = Environment()
# Try broadcasting
_ = env.source(None).round_robin().map(None).set_parallelism(2)
expected = [(0, 0), (0, 1)]
_test_channels(env, expected)
def _test_channels(environment, expected_channels):
"""Tests operator connectivity."""
environment._collect_garbage()
map_id = None
# Get id
for id, operator in environment.operators.items():
if operator.type == OpType.Map:
map_id = id
# Collect channels
channels_per_destination = []
for operator in environment.operators.values():
channels_per_destination.append(
environment._generate_channels(operator))
# Check actual connectivity
actual = []
for destination in channels_per_destination:
for channels in destination.values():
for channel in channels:
src_instance_id = channel.src_instance_id
dst_instance_id = channel.dst_instance_id
connection = (src_instance_id, dst_instance_id)
assert channel.dst_operator_id == map_id, (
channel.dst_operator_id, map_id)
actual.append(connection)
# Make sure connections are as expected
set_1 = set(expected_channels)
set_2 = set(actual)
assert set_1 == set_2, (set_1, set_2)
def test_channel_generation():
"""Tests data channel generation."""
_test_shuffle_channels()
_test_broadcast_channels()
_test_round_robin_channels()
_test_forward_channels()
# TODO (john): Add simple wordcount test
def test_wordcount():
"""Tests a simple streaming wordcount."""
pass