Make xray object table credis-managed and hence flushable. (#2338)

* monitor.py: issue flushes to data shard

* ResultTableAdd & ObjectTableAdd: add credis-managed versions

* Fix return codes

* Credis-manage xray object table & associated ray.table_append cmd

* Fix incorrect return code from TableAppend_DoWrite()

* Revert "ResultTableAdd & ObjectTableAdd: add credis-managed versions"

This reverts commit 628c2ea190df4c861dda0c284fab7ca6faa1ea24.

* Address comments

* Lint: fix indent

* Address comment
This commit is contained in:
Zongheng Yang
2018-07-03 17:32:44 -07:00
committed by Philipp Moritz
parent f21d783e6d
commit ba28dddf6f
6 changed files with 158 additions and 76 deletions
+19 -9
View File
@@ -111,14 +111,24 @@ class Monitor(object):
self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ
self.gcs_flush_policy = None
if self.issue_gcs_flushes:
# For now, we take the primary redis server to issue flushes,
# because task table entries are stored there under this flag.
try:
self.redis.execute_command("HEAD.FLUSH 0")
except redis.exceptions.ResponseError as e:
log.info("Turning off flushing due to exception: {}".format(
str(e)))
# Data is stored under the first data shard, so we issue flushes to
# that redis server.
addr_port = self.redis.lrange("RedisShards", 0, -1)
if len(addr_port) > 1:
log.warning("TODO: if launching > 1 redis shard, flushing "
"needs to touch shards in parallel.")
self.issue_gcs_flushes = False
else:
addr_port = addr_port[0].split(b":")
self.redis_shard = redis.StrictRedis(
host=addr_port[0], port=addr_port[1])
try:
self.redis_shard.execute_command("HEAD.FLUSH 0")
except redis.exceptions.ResponseError as e:
log.info(
"Turning off flushing due to exception: {}".format(
str(e)))
self.issue_gcs_flushes = False
def subscribe(self, channel):
"""Subscribe to the given channel.
@@ -562,11 +572,11 @@ class Monitor(object):
return
self.gcs_flush_policy = pickle.loads(serialized)
if not self.gcs_flush_policy.should_flush(self.redis):
if not self.gcs_flush_policy.should_flush(self.redis_shard):
return
max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush()
num_flushed = self.redis.execute_command(
num_flushed = self.redis_shard.execute_command(
"HEAD.FLUSH {}".format(max_entries_to_flush))
log.info('num_flushed {}'.format(num_flushed))