From 0a556dc0b581cc1c2a7f91b3b61c28090ce990fc Mon Sep 17 00:00:00 2001 From: Wang Qing Date: Sat, 12 Jan 2019 23:01:48 +0800 Subject: [PATCH] Refine redis client (#3758) --- .../org/ray/runtime/RayNativeRuntime.java | 11 +- .../java/org/ray/runtime/gcs/AddressInfo.java | 18 -- .../ray/runtime/gcs/KeyValueStoreLink.java | 138 ------------- .../java/org/ray/runtime/gcs/RedisClient.java | 190 ++---------------- .../org/ray/runtime/gcs/StateStoreProxy.java | 36 ---- .../ray/runtime/gcs/StateStoreProxyImpl.java | 163 --------------- 6 files changed, 20 insertions(+), 536 deletions(-) delete mode 100644 java/runtime/src/main/java/org/ray/runtime/gcs/AddressInfo.java delete mode 100644 java/runtime/src/main/java/org/ray/runtime/gcs/KeyValueStoreLink.java delete mode 100644 java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxy.java delete mode 100644 java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxyImpl.java diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 386202d38..8374ffdcc 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -5,11 +5,8 @@ import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; -import org.apache.arrow.plasma.ObjectStoreLink; -import org.apache.arrow.plasma.PlasmaClient; import org.ray.runtime.config.RayConfig; import org.ray.runtime.config.WorkerMode; -import org.ray.runtime.gcs.KeyValueStoreLink; import org.ray.runtime.gcs.RedisClient; import org.ray.runtime.objectstore.ObjectStoreProxy; import org.ray.runtime.raylet.RayletClientImpl; @@ -24,7 +21,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { private static final Logger LOGGER = LoggerFactory.getLogger(RayNativeRuntime.class); - private KeyValueStoreLink kvStore = null; + private RedisClient redisClient = null; private RunManager manager = null; public RayNativeRuntime(RayConfig rayConfig) { @@ -72,7 +69,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { manager = new RunManager(rayConfig); manager.startRayProcesses(true); } - kvStore = new RedisClient(rayConfig.getRedisAddress()); + redisClient = new RedisClient(rayConfig.getRedisAddress()); objectStoreProxy = new ObjectStoreProxy(this, rayConfig.objectStoreSocketName); @@ -108,13 +105,13 @@ public final class RayNativeRuntime extends AbstractRayRuntime { workerInfo.put("raylet_socket", rayConfig.rayletSocketName); workerInfo.put("name", System.getProperty("user.dir")); //TODO: worker.redis_client.hmset(b"Drivers:" + worker.workerId, driver_info) - kvStore.hmset("Drivers:" + workerId, workerInfo); + redisClient.hmset("Drivers:" + workerId, workerInfo); } else { workerInfo.put("node_ip_address", rayConfig.nodeIp); workerInfo.put("plasma_store_socket", rayConfig.objectStoreSocketName); workerInfo.put("raylet_socket", rayConfig.rayletSocketName); //TODO: b"Workers:" + worker.workerId, - kvStore.hmset("Workers:" + workerId, workerInfo); + redisClient.hmset("Workers:" + workerId, workerInfo); } } diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/AddressInfo.java b/java/runtime/src/main/java/org/ray/runtime/gcs/AddressInfo.java deleted file mode 100644 index 16e3706f2..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/AddressInfo.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.ray.runtime.gcs; - -/** - * Represents information of different process roles. - */ -public class AddressInfo { - - public String managerName; - public String storeName; - public String schedulerName; - public String rayletSocketName; - public int managerPort; - public int workerCount; - public String managerRpcAddr; - public String storeRpcAddr; - public String schedulerRpcAddr; - public String rayletRpcAddr; -} diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/KeyValueStoreLink.java b/java/runtime/src/main/java/org/ray/runtime/gcs/KeyValueStoreLink.java deleted file mode 100644 index afa033089..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/KeyValueStoreLink.java +++ /dev/null @@ -1,138 +0,0 @@ -package org.ray.runtime.gcs; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Ray K/V abstraction. - */ -public interface KeyValueStoreLink { - - /** - * set address of kv store: format "ip:port". - */ - void setAddr(String addr); - - /** - * check if the kvstore client connected. - */ - void checkConnected() throws Exception; - - /** - * set Key-value into State Store, such as redis. - * - * @param key the key to set - * @param value the value to set - * @param field the field is being set when the item is a hash If it is not hash field should be - * filled with null - * @return If the key(or field) already exists, and the StateStoreSet just produced an update of - * the value, 0 is returned, otherwise if a new key(or field) is created 1 is returned. - */ - Long set(final String key, final String value, final String field); - - Long set(final byte[] key, final byte[] value, final byte[] field); - - /** - * multi hash value set. - * - * @param key the key in kvStore - * @param hash the multi hash value to be set - * @return Return OK or Exception if hash is empty - */ - String hmset(final String key, final Map hash); - - String hmset(final byte[] key, final Map hash); - - /** - * multi hash value get. - * - * @param key the key in kvStore - * @param fields the fields to be get - * @return Multi Bulk Reply specifically a list of all the values associated with the specified - * fields, in the same order of the request. - */ - List hmget(final String key, final String... fields); - - List hmget(final byte[] key, final byte[]... fields); - - /** - * get the value of the specified key from State Store. - * - * @param key the key to get - * @param field the field is being got when the item is a hash If it is not hash field should be - * filled with null - * @return Bulk reply If the key does not exist null is returned. - */ - String get(final String key, final String field); - - byte[] get(final byte[] key, final byte[] field); - - /** - * delete the key(or the specified field of the key) from State Store. - * - * @param key the key to delete - * @param field the field is to delete when the item is a hash If it is not hash field should be - * filled with null - * @return Integer reply, specifically: an integer greater than 0 if the key(or the field) was - * removed 0 if none of the specified key existed - */ - Long delete(final String key, final String field); - - Long delete(final byte[] key, final byte[] field); - - /** - * get all keys which fit the pattern. - */ - Set keys(final byte[] pattern); - - /** - * get all keys which fit the pattern. - */ - Set keys(String pattern); - - /** - * get all hash of the key. - */ - Map hgetAll(final byte[] key); - - /** - * Return the specified elements of the list stored at the specified key. - * - * @return Multi bulk reply, specifically a list of elements in the specified range. - */ - List lrange(final String key, final long start, final long end); - - /** - * Return the set of elements of the sorted set stored at the specified key. - * @param key The specified key you want to query. - * @param start The start index of the range. - * @param end The end index of the range. - * @return The set of elements you queried. - */ - Set zrange(byte[] key, long start, long end); - - /** - * Rpush. - * @return Integer reply, specifically, the number of elements inside the list after the push - * operation. - */ - Long rpush(final String key, final String... strings); - - Long rpush(final byte[] key, final byte[]... strings); - - /** - * Publish. - * @param channel To which channel the message will be published - * @param message What to publish - * @return the number of clients that received the message - */ - Long publish(final String channel, final String message); - - Long publish(byte[] channel, byte[] message); - - Object getImpl(); - - byte[] sendCommand(String command, int commandType, byte[] objectId); - -} diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java b/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java index 55d8bef8a..4997bb55a 100644 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java @@ -1,46 +1,32 @@ package org.ray.runtime.gcs; -import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; -public class RedisClient implements KeyValueStoreLink { +/** + * Redis client class. + */ +public class RedisClient { + + private static final int JEDIS_POOL_SIZE = 1; - private String redisAddress; private JedisPool jedisPool; - private int handle = 0; - public RedisClient() { - } - - public RedisClient(String addr) { - setAddr(addr); - } - - @Override - public synchronized void setAddr(String addr) { - if (StringUtils.isEmpty(redisAddress)) { - redisAddress = addr; - String[] ipPort = addr.split(":"); - JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); - //TODO NUM maybe equels to the thread num - jedisPoolConfig.setMaxTotal(1); - jedisPool = new JedisPool(jedisPoolConfig, ipPort[0], Integer.parseInt(ipPort[1]), 30000); + public RedisClient(String redisAddress) { + String[] ipAndPort = redisAddress.split(":"); + if (ipAndPort.length != 2) { + throw new IllegalArgumentException("The argument redisAddress " + + "should be formatted as ip:port."); } + + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxTotal(JEDIS_POOL_SIZE); + jedisPool = new JedisPool(jedisPoolConfig, ipAndPort[0], + Integer.parseInt(ipAndPort[1]), 30000); } - @Override - public void checkConnected() throws Exception { - if (jedisPool == null) { - throw new Exception("the GlobalState API can't be used before ray init."); - } - } - - @Override public Long set(final String key, final String value, final String field) { try (Jedis jedis = jedisPool.getResource()) { if (field == null) { @@ -53,20 +39,6 @@ public class RedisClient implements KeyValueStoreLink { } - @Override - public Long set(byte[] key, byte[] value, byte[] field) { - try (Jedis jedis = jedisPool.getResource()) { - if (field == null) { - jedis.set(key, value); - return (long) 1; - } else { - return jedis.hset(key, field, value); - } - } - - } - - @Override public String hmset(String key, Map hash) { try (Jedis jedis = jedisPool.getResource()) { return jedis.hmset(key, hash); @@ -74,28 +46,6 @@ public class RedisClient implements KeyValueStoreLink { } - @Override - public String hmset(byte[] key, Map hash) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.hmset(key, hash); - } - } - - @Override - public List hmget(String key, String... fields) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.hmget(key, fields); - } - } - - @Override - public List hmget(byte[] key, byte[]... fields) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.hmget(key, fields); - } - } - - @Override public String get(final String key, final String field) { try (Jedis jedis = jedisPool.getResource()) { if (field == null) { @@ -107,7 +57,6 @@ public class RedisClient implements KeyValueStoreLink { } - @Override public byte[] get(byte[] key, byte[] field) { try (Jedis jedis = jedisPool.getResource()) { if (field == null) { @@ -119,111 +68,4 @@ public class RedisClient implements KeyValueStoreLink { } - @Override - public Long delete(final String key, final String field) { - try (Jedis jedis = jedisPool.getResource()) { - if (field == null) { - return jedis.del(key); - } else { - return jedis.hdel(key, field); - } - } - - } - - @Override - public Long delete(byte[] key, byte[] field) { - try (Jedis jedis = jedisPool.getResource()) { - if (field == null) { - return jedis.del(key); - } else { - return jedis.hdel(key, field); - } - } - - } - - @Override - public Set keys(byte[] pattern) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.keys(pattern); - } - } - - @Override - public Set keys(String pattern) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.keys(pattern); - } - } - - @Override - public Map hgetAll(byte[] key) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.hgetAll(key); - } - } - - @Override - public List lrange(String key, long start, long end) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.lrange(key, start, end); - } - } - - @Override - public Set zrange(byte[] key, long start, long end) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.zrange(key, start, end); - } - } - - @Override - public Long rpush(String key, String... strings) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.rpush(key, strings); - } - } - - @Override - public Long rpush(byte[] key, byte[]... strings) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.rpush(key, strings); - } - } - - @Override - public Long publish(String channel, String message) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.publish(channel, message); - } - } - - @Override - public Long publish(byte[] channel, byte[] message) { - try (Jedis jedis = jedisPool.getResource()) { - return jedis.publish(channel, message); - } - } - - @Override - public Object getImpl() { - return jedisPool; - } - - @Override - public byte[] sendCommand(String command, int commandType, byte[] objectId) { - if (handle == 0) { - String[] ipPort = redisAddress.split(":"); - handle = connect(ipPort[0], Integer.parseInt(ipPort[1])); - } - return execute_command(handle, command, commandType, objectId); - } - - private static native int connect(String redisAddress, int port); - - private static native void disconnect(int handle); - - private static native byte[] execute_command(int handle, - String command, int commandType, byte[] objectId); } diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxy.java b/java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxy.java deleted file mode 100644 index 36d1eaf67..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxy.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.ray.runtime.gcs; - -import java.util.List; -import java.util.Set; - -/** - * Proxy client for state store, for instance redis. - */ -public interface StateStoreProxy { - - /** - * setStore. - * @param rayKvStore the underlying kv store used to store states - */ - void setStore(KeyValueStoreLink rayKvStore); - - - /** - * initialize the store. - */ - void initializeGlobalState() throws Exception; - - /** - * keys. - * @param pattern filter which keys you are interested in. - */ - Set keys(final String pattern); - - /** - * getAddressInfo. - * @return list of address information - */ - List getAddressInfo(final String nodeIpAddress, - final String redisAddress, - int numRetries); -} diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxyImpl.java b/java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxyImpl.java deleted file mode 100644 index fe74adc0f..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/StateStoreProxyImpl.java +++ /dev/null @@ -1,163 +0,0 @@ -package org.ray.runtime.gcs; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import org.ray.api.id.UniqueId; -import org.ray.runtime.generated.ClientTableData; -import org.ray.runtime.util.NetworkUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A class used to interface with the Ray control state. - */ -public class StateStoreProxyImpl implements StateStoreProxy { - - private static final Logger LOGGER = LoggerFactory.getLogger(StateStoreProxyImpl.class); - public KeyValueStoreLink rayKvStore; - public ArrayList shardStoreList = new ArrayList<>(); - - public StateStoreProxyImpl(KeyValueStoreLink rayKvStore) { - this.rayKvStore = rayKvStore; - } - - @Override - public void setStore(KeyValueStoreLink rayKvStore) { - this.rayKvStore = rayKvStore; - } - - @Override - public synchronized void initializeGlobalState() throws Exception { - - String es; - - checkConnected(); - - String s = rayKvStore.get("NumRedisShards", null); - if (s == null) { - throw new Exception("NumRedisShards not found in redis."); - } - int numRedisShards = Integer.parseInt(s); - if (numRedisShards < 1) { - es = String.format("Expected at least one Redis shard, found %d", numRedisShards); - throw new Exception(es); - } - List ipAddressPorts = rayKvStore.lrange("RedisShards", 0, -1); - Set distinctIpAddress = new HashSet(ipAddressPorts); - if (distinctIpAddress.size() != numRedisShards) { - es = String.format("Expected %d Redis shard addresses, found2 %d.", numRedisShards, - distinctIpAddress.size()); - throw new Exception(es); - } - - shardStoreList.clear(); - for (String ipPort : distinctIpAddress) { - shardStoreList.add(new RedisClient(ipPort)); - } - - } - - public void checkConnected() throws Exception { - rayKvStore.checkConnected(); - } - - @Override - public synchronized Set keys(final String pattern) { - Set allKeys = new HashSet<>(); - Set tmpKey; - for (KeyValueStoreLink ashardStoreList : shardStoreList) { - tmpKey = ashardStoreList.keys(pattern); - allKeys.addAll(tmpKey); - } - - return allKeys; - } - - @Override - public List getAddressInfo(final String nodeIpAddress, - final String redisAddress, - int numRetries) { - int count = 0; - while (count < numRetries) { - try { - return doGetAddressInfo(nodeIpAddress, redisAddress); - } catch (Exception e) { - try { - LOGGER.warn("Error occurred in StateStoreProxyImpl getAddressInfo, {} retries remaining", - (numRetries - count), e); - TimeUnit.MILLISECONDS.sleep(1000); - } catch (InterruptedException ie) { - LOGGER.error("error at StateStoreProxyImpl getAddressInfo", e); - throw new RuntimeException(e); - } - } - count++; - } - throw new RuntimeException("cannot get address info from state store"); - } - - /** - * Get address info of one node from primary redis. - * This method only tries to get address info once, without any retry. - * - * @param nodeIpAddress Usually local ip address. - * @param redisAddress The primary redis address. - * @return A list of SchedulerInfo which contains node manager or local scheduler address info. - * @throws Exception No redis client exception. - */ - public List doGetAddressInfo(final String nodeIpAddress, - final String redisAddress) throws Exception { - if (this.rayKvStore == null) { - throw new Exception("no redis client when use doGetAddressInfo"); - } - List schedulerInfo = new ArrayList<>(); - - byte[] prefix = "CLIENT".getBytes(); - byte[] postfix = UniqueId.genNil().getBytes(); - byte[] clientKey = new byte[prefix.length + postfix.length]; - System.arraycopy(prefix, 0, clientKey, 0, prefix.length); - System.arraycopy(postfix, 0, clientKey, prefix.length, postfix.length); - - Set clients = rayKvStore.zrange(clientKey, 0, -1); - - for (byte[] clientMessage : clients) { - ByteBuffer bb = ByteBuffer.wrap(clientMessage); - ClientTableData client = ClientTableData.getRootAsClientTableData(bb); - String clientNodeIpAddress = client.nodeManagerAddress(); - - String localIpAddress = NetworkUtil.getIpAddress(null); - String redisIpAddress = redisAddress.substring(0, redisAddress.indexOf(':')); - - boolean headNodeAddress = "127.0.0.1".equals(clientNodeIpAddress) - && Objects.equals(redisIpAddress, localIpAddress); - boolean notHeadNodeAddress = Objects.equals(clientNodeIpAddress, nodeIpAddress); - - if (headNodeAddress || notHeadNodeAddress) { - AddressInfo si = new AddressInfo(); - si.storeName = client.objectStoreSocketName(); - si.rayletSocketName = client.rayletSocketName(); - si.managerRpcAddr = client.nodeManagerAddress(); - si.managerPort = client.nodeManagerPort(); - schedulerInfo.add(si); - } - } - return schedulerInfo; - } - - protected String charsetDecode(byte[] bs, String charset) throws UnsupportedEncodingException { - return new String(bs, charset); - } - - protected byte[] charsetEncode(String str, String charset) throws UnsupportedEncodingException { - if (str != null) { - return str.getBytes(charset); - } - return null; - } -}