Refine redis client (#3758)

This commit is contained in:
Wang Qing
2019-01-12 23:01:48 +08:00
committed by Hao Chen
parent a0cf8ee5a8
commit 0a556dc0b5
6 changed files with 20 additions and 536 deletions
@@ -5,11 +5,8 @@ import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.arrow.plasma.PlasmaClient;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.gcs.KeyValueStoreLink;
import org.ray.runtime.gcs.RedisClient;
import org.ray.runtime.objectstore.ObjectStoreProxy;
import org.ray.runtime.raylet.RayletClientImpl;
@@ -24,7 +21,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
private static final Logger LOGGER = LoggerFactory.getLogger(RayNativeRuntime.class);
private KeyValueStoreLink kvStore = null;
private RedisClient redisClient = null;
private RunManager manager = null;
public RayNativeRuntime(RayConfig rayConfig) {
@@ -72,7 +69,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
manager = new RunManager(rayConfig);
manager.startRayProcesses(true);
}
kvStore = new RedisClient(rayConfig.getRedisAddress());
redisClient = new RedisClient(rayConfig.getRedisAddress());
objectStoreProxy = new ObjectStoreProxy(this, rayConfig.objectStoreSocketName);
@@ -108,13 +105,13 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
workerInfo.put("raylet_socket", rayConfig.rayletSocketName);
workerInfo.put("name", System.getProperty("user.dir"));
//TODO: worker.redis_client.hmset(b"Drivers:" + worker.workerId, driver_info)
kvStore.hmset("Drivers:" + workerId, workerInfo);
redisClient.hmset("Drivers:" + workerId, workerInfo);
} else {
workerInfo.put("node_ip_address", rayConfig.nodeIp);
workerInfo.put("plasma_store_socket", rayConfig.objectStoreSocketName);
workerInfo.put("raylet_socket", rayConfig.rayletSocketName);
//TODO: b"Workers:" + worker.workerId,
kvStore.hmset("Workers:" + workerId, workerInfo);
redisClient.hmset("Workers:" + workerId, workerInfo);
}
}
@@ -1,18 +0,0 @@
package org.ray.runtime.gcs;
/**
* Represents information of different process roles.
*/
public class AddressInfo {
public String managerName;
public String storeName;
public String schedulerName;
public String rayletSocketName;
public int managerPort;
public int workerCount;
public String managerRpcAddr;
public String storeRpcAddr;
public String schedulerRpcAddr;
public String rayletRpcAddr;
}
@@ -1,138 +0,0 @@
package org.ray.runtime.gcs;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Ray K/V abstraction.
*/
public interface KeyValueStoreLink {
/**
* set address of kv store: format "ip:port".
*/
void setAddr(String addr);
/**
* check if the kvstore client connected.
*/
void checkConnected() throws Exception;
/**
* set Key-value into State Store, such as redis.
*
* @param key the key to set
* @param value the value to set
* @param field the field is being set when the item is a hash If it is not hash field should be
* filled with null
* @return If the key(or field) already exists, and the StateStoreSet just produced an update of
* the value, 0 is returned, otherwise if a new key(or field) is created 1 is returned.
*/
Long set(final String key, final String value, final String field);
Long set(final byte[] key, final byte[] value, final byte[] field);
/**
* multi hash value set.
*
* @param key the key in kvStore
* @param hash the multi hash value to be set
* @return Return OK or Exception if hash is empty
*/
String hmset(final String key, final Map<String, String> hash);
String hmset(final byte[] key, final Map<byte[], byte[]> hash);
/**
* multi hash value get.
*
* @param key the key in kvStore
* @param fields the fields to be get
* @return Multi Bulk Reply specifically a list of all the values associated with the specified
* fields, in the same order of the request.
*/
List<String> hmget(final String key, final String... fields);
List<byte[]> hmget(final byte[] key, final byte[]... fields);
/**
* get the value of the specified key from State Store.
*
* @param key the key to get
* @param field the field is being got when the item is a hash If it is not hash field should be
* filled with null
* @return Bulk reply If the key does not exist null is returned.
*/
String get(final String key, final String field);
byte[] get(final byte[] key, final byte[] field);
/**
* delete the key(or the specified field of the key) from State Store.
*
* @param key the key to delete
* @param field the field is to delete when the item is a hash If it is not hash field should be
* filled with null
* @return Integer reply, specifically: an integer greater than 0 if the key(or the field) was
* removed 0 if none of the specified key existed
*/
Long delete(final String key, final String field);
Long delete(final byte[] key, final byte[] field);
/**
* get all keys which fit the pattern.
*/
Set<byte[]> keys(final byte[] pattern);
/**
* get all keys which fit the pattern.
*/
Set<String> keys(String pattern);
/**
* get all hash of the key.
*/
Map<byte[], byte[]> hgetAll(final byte[] key);
/**
* Return the specified elements of the list stored at the specified key.
*
* @return Multi bulk reply, specifically a list of elements in the specified range.
*/
List<String> lrange(final String key, final long start, final long end);
/**
* Return the set of elements of the sorted set stored at the specified key.
* @param key The specified key you want to query.
* @param start The start index of the range.
* @param end The end index of the range.
* @return The set of elements you queried.
*/
Set<byte[]> zrange(byte[] key, long start, long end);
/**
* Rpush.
* @return Integer reply, specifically, the number of elements inside the list after the push
* operation.
*/
Long rpush(final String key, final String... strings);
Long rpush(final byte[] key, final byte[]... strings);
/**
* Publish.
* @param channel To which channel the message will be published
* @param message What to publish
* @return the number of clients that received the message
*/
Long publish(final String channel, final String message);
Long publish(byte[] channel, byte[] message);
Object getImpl();
byte[] sendCommand(String command, int commandType, byte[] objectId);
}
@@ -1,46 +1,32 @@
package org.ray.runtime.gcs;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisClient implements KeyValueStoreLink {
/**
* Redis client class.
*/
public class RedisClient {
private static final int JEDIS_POOL_SIZE = 1;
private String redisAddress;
private JedisPool jedisPool;
private int handle = 0;
public RedisClient() {
}
public RedisClient(String addr) {
setAddr(addr);
}
@Override
public synchronized void setAddr(String addr) {
if (StringUtils.isEmpty(redisAddress)) {
redisAddress = addr;
String[] ipPort = addr.split(":");
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//TODO NUM maybe equels to the thread num
jedisPoolConfig.setMaxTotal(1);
jedisPool = new JedisPool(jedisPoolConfig, ipPort[0], Integer.parseInt(ipPort[1]), 30000);
public RedisClient(String redisAddress) {
String[] ipAndPort = redisAddress.split(":");
if (ipAndPort.length != 2) {
throw new IllegalArgumentException("The argument redisAddress " +
"should be formatted as ip:port.");
}
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(JEDIS_POOL_SIZE);
jedisPool = new JedisPool(jedisPoolConfig, ipAndPort[0],
Integer.parseInt(ipAndPort[1]), 30000);
}
@Override
public void checkConnected() throws Exception {
if (jedisPool == null) {
throw new Exception("the GlobalState API can't be used before ray init.");
}
}
@Override
public Long set(final String key, final String value, final String field) {
try (Jedis jedis = jedisPool.getResource()) {
if (field == null) {
@@ -53,20 +39,6 @@ public class RedisClient implements KeyValueStoreLink {
}
@Override
public Long set(byte[] key, byte[] value, byte[] field) {
try (Jedis jedis = jedisPool.getResource()) {
if (field == null) {
jedis.set(key, value);
return (long) 1;
} else {
return jedis.hset(key, field, value);
}
}
}
@Override
public String hmset(String key, Map<String, String> hash) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.hmset(key, hash);
@@ -74,28 +46,6 @@ public class RedisClient implements KeyValueStoreLink {
}
@Override
public String hmset(byte[] key, Map<byte[], byte[]> hash) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.hmset(key, hash);
}
}
@Override
public List<String> hmget(String key, String... fields) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.hmget(key, fields);
}
}
@Override
public List<byte[]> hmget(byte[] key, byte[]... fields) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.hmget(key, fields);
}
}
@Override
public String get(final String key, final String field) {
try (Jedis jedis = jedisPool.getResource()) {
if (field == null) {
@@ -107,7 +57,6 @@ public class RedisClient implements KeyValueStoreLink {
}
@Override
public byte[] get(byte[] key, byte[] field) {
try (Jedis jedis = jedisPool.getResource()) {
if (field == null) {
@@ -119,111 +68,4 @@ public class RedisClient implements KeyValueStoreLink {
}
@Override
public Long delete(final String key, final String field) {
try (Jedis jedis = jedisPool.getResource()) {
if (field == null) {
return jedis.del(key);
} else {
return jedis.hdel(key, field);
}
}
}
@Override
public Long delete(byte[] key, byte[] field) {
try (Jedis jedis = jedisPool.getResource()) {
if (field == null) {
return jedis.del(key);
} else {
return jedis.hdel(key, field);
}
}
}
@Override
public Set<byte[]> keys(byte[] pattern) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.keys(pattern);
}
}
@Override
public Set<String> keys(String pattern) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.keys(pattern);
}
}
@Override
public Map<byte[], byte[]> hgetAll(byte[] key) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.hgetAll(key);
}
}
@Override
public List<String> lrange(String key, long start, long end) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.lrange(key, start, end);
}
}
@Override
public Set<byte[]> zrange(byte[] key, long start, long end) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.zrange(key, start, end);
}
}
@Override
public Long rpush(String key, String... strings) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.rpush(key, strings);
}
}
@Override
public Long rpush(byte[] key, byte[]... strings) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.rpush(key, strings);
}
}
@Override
public Long publish(String channel, String message) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.publish(channel, message);
}
}
@Override
public Long publish(byte[] channel, byte[] message) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.publish(channel, message);
}
}
@Override
public Object getImpl() {
return jedisPool;
}
@Override
public byte[] sendCommand(String command, int commandType, byte[] objectId) {
if (handle == 0) {
String[] ipPort = redisAddress.split(":");
handle = connect(ipPort[0], Integer.parseInt(ipPort[1]));
}
return execute_command(handle, command, commandType, objectId);
}
private static native int connect(String redisAddress, int port);
private static native void disconnect(int handle);
private static native byte[] execute_command(int handle,
String command, int commandType, byte[] objectId);
}
@@ -1,36 +0,0 @@
package org.ray.runtime.gcs;
import java.util.List;
import java.util.Set;
/**
* Proxy client for state store, for instance redis.
*/
public interface StateStoreProxy {
/**
* setStore.
* @param rayKvStore the underlying kv store used to store states
*/
void setStore(KeyValueStoreLink rayKvStore);
/**
* initialize the store.
*/
void initializeGlobalState() throws Exception;
/**
* keys.
* @param pattern filter which keys you are interested in.
*/
Set<String> keys(final String pattern);
/**
* getAddressInfo.
* @return list of address information
*/
List<AddressInfo> getAddressInfo(final String nodeIpAddress,
final String redisAddress,
int numRetries);
}
@@ -1,163 +0,0 @@
package org.ray.runtime.gcs;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.ray.api.id.UniqueId;
import org.ray.runtime.generated.ClientTableData;
import org.ray.runtime.util.NetworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A class used to interface with the Ray control state.
*/
public class StateStoreProxyImpl implements StateStoreProxy {
private static final Logger LOGGER = LoggerFactory.getLogger(StateStoreProxyImpl.class);
public KeyValueStoreLink rayKvStore;
public ArrayList<KeyValueStoreLink> shardStoreList = new ArrayList<>();
public StateStoreProxyImpl(KeyValueStoreLink rayKvStore) {
this.rayKvStore = rayKvStore;
}
@Override
public void setStore(KeyValueStoreLink rayKvStore) {
this.rayKvStore = rayKvStore;
}
@Override
public synchronized void initializeGlobalState() throws Exception {
String es;
checkConnected();
String s = rayKvStore.get("NumRedisShards", null);
if (s == null) {
throw new Exception("NumRedisShards not found in redis.");
}
int numRedisShards = Integer.parseInt(s);
if (numRedisShards < 1) {
es = String.format("Expected at least one Redis shard, found %d", numRedisShards);
throw new Exception(es);
}
List<String> ipAddressPorts = rayKvStore.lrange("RedisShards", 0, -1);
Set<String> distinctIpAddress = new HashSet<String>(ipAddressPorts);
if (distinctIpAddress.size() != numRedisShards) {
es = String.format("Expected %d Redis shard addresses, found2 %d.", numRedisShards,
distinctIpAddress.size());
throw new Exception(es);
}
shardStoreList.clear();
for (String ipPort : distinctIpAddress) {
shardStoreList.add(new RedisClient(ipPort));
}
}
public void checkConnected() throws Exception {
rayKvStore.checkConnected();
}
@Override
public synchronized Set<String> keys(final String pattern) {
Set<String> allKeys = new HashSet<>();
Set<String> tmpKey;
for (KeyValueStoreLink ashardStoreList : shardStoreList) {
tmpKey = ashardStoreList.keys(pattern);
allKeys.addAll(tmpKey);
}
return allKeys;
}
@Override
public List<AddressInfo> getAddressInfo(final String nodeIpAddress,
final String redisAddress,
int numRetries) {
int count = 0;
while (count < numRetries) {
try {
return doGetAddressInfo(nodeIpAddress, redisAddress);
} catch (Exception e) {
try {
LOGGER.warn("Error occurred in StateStoreProxyImpl getAddressInfo, {} retries remaining",
(numRetries - count), e);
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException ie) {
LOGGER.error("error at StateStoreProxyImpl getAddressInfo", e);
throw new RuntimeException(e);
}
}
count++;
}
throw new RuntimeException("cannot get address info from state store");
}
/**
* Get address info of one node from primary redis.
* This method only tries to get address info once, without any retry.
*
* @param nodeIpAddress Usually local ip address.
* @param redisAddress The primary redis address.
* @return A list of SchedulerInfo which contains node manager or local scheduler address info.
* @throws Exception No redis client exception.
*/
public List<AddressInfo> doGetAddressInfo(final String nodeIpAddress,
final String redisAddress) throws Exception {
if (this.rayKvStore == null) {
throw new Exception("no redis client when use doGetAddressInfo");
}
List<AddressInfo> schedulerInfo = new ArrayList<>();
byte[] prefix = "CLIENT".getBytes();
byte[] postfix = UniqueId.genNil().getBytes();
byte[] clientKey = new byte[prefix.length + postfix.length];
System.arraycopy(prefix, 0, clientKey, 0, prefix.length);
System.arraycopy(postfix, 0, clientKey, prefix.length, postfix.length);
Set<byte[]> clients = rayKvStore.zrange(clientKey, 0, -1);
for (byte[] clientMessage : clients) {
ByteBuffer bb = ByteBuffer.wrap(clientMessage);
ClientTableData client = ClientTableData.getRootAsClientTableData(bb);
String clientNodeIpAddress = client.nodeManagerAddress();
String localIpAddress = NetworkUtil.getIpAddress(null);
String redisIpAddress = redisAddress.substring(0, redisAddress.indexOf(':'));
boolean headNodeAddress = "127.0.0.1".equals(clientNodeIpAddress)
&& Objects.equals(redisIpAddress, localIpAddress);
boolean notHeadNodeAddress = Objects.equals(clientNodeIpAddress, nodeIpAddress);
if (headNodeAddress || notHeadNodeAddress) {
AddressInfo si = new AddressInfo();
si.storeName = client.objectStoreSocketName();
si.rayletSocketName = client.rayletSocketName();
si.managerRpcAddr = client.nodeManagerAddress();
si.managerPort = client.nodeManagerPort();
schedulerInfo.add(si);
}
}
return schedulerInfo;
}
protected String charsetDecode(byte[] bs, String charset) throws UnsupportedEncodingException {
return new String(bs, charset);
}
protected byte[] charsetEncode(String str, String charset) throws UnsupportedEncodingException {
if (str != null) {
return str.getBytes(charset);
}
return null;
}
}