背景
最近接到一个需求,redis中的变量失效时,需要记录到日志中。查了一下,可以通过订阅redis事件,java中进行处理。订阅事件会阻塞当前进程,所以需要开线程处理。后台服务是SpringBoot搭建,而且是多实例,为了确保每个事件仅消费一次,适用redis锁。
Redis设置
redis中需要开启事件通知,默认是不开启的。
- 修改redis.conf文件,设置notify-keyspace-events 为 Ex;
- redis中执行
config set notify-keyspace-events Ex
(该方式重启redis失效);
参数 |
作用 |
K |
键空间通知,以__keyspace@<db>__ 为前缀 |
E |
键事件通知,以__keysevent@<db>__ 为前缀 |
g |
del , expipre , rename 等类型无关的通用命令的通知 |
$ |
String命令 |
l |
List命令 |
s |
Set命令 |
h |
Hash命令 |
z |
有序集合命令 |
x |
过期时间(每次key过期时生成) |
e |
去住时间(当key在内存存满了被清除时生成) |
A |
g$lshzxe的别名,因此“AKE”意味着所有事件 |
Java代码
设置JedisPool
增加jedis依赖
pom文件增加:
1 2 3 4
| <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
|
yml中增加配置
1 2 3 4 5 6 7 8 9 10
| jedis: max: total: 100 idle: 10 waitmillis: 10000 timeout: 10000 testOnBorrow: true host: localhost port: 6379 password: 123456
|
实例化JedisPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.springboot.common;
import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;
@Configuration @EnableCaching public class JedisConfig extends CachingConfigurerSupport{ @Value("${jedis.port}") private int port; @Value("${jedis.host}") private String host; @Value("${jedis.max.total}") private Integer maxTotal; @Value("${jedis.max.idle}") private Integer maxIdle; @Value("${jedis.max.waitmillis}") private Long maxWaitMillis; @Value("${jedis.max.timeout}") private int timeout; @Value("${jedis.password}") private String password;
@Bean public JedisPool redisPoolFactory() { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password); return jedisPool; } }
|
创建监听类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.springboot.common;
import com.springboot.util.ApplicationContextUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub;
import javax.annotation.PostConstruct;
@Component public class RedisListener extends JedisPubSub{
private static final String INSTANCE_LOCK = "INSTANCE_LOCK";
@Autowired private JedisPool jedisPool; Jedis jedis = null;
@Override public void onPMessage(String pattern, String channel, String message) {
jedisPool = ApplicationContextUtil.getBean(JedisPool.class); jedis = jedisPool.getResource();
Long lock = jedis.setnx(INSTANCE_LOCK, "true"); if (lock == 0) { jedis.close(); return ; }
System.out.println(pattern + "=" + channel + "=" + message); jedis.del(INSTANCE_LOCK); jedis.close(); }
@PostConstruct void init() { SubThread subThread = new SubThread(jedisPool); Thread thread = new Thread(subThread); thread.start(); } }
|
一开始的方案,实例共用一个锁,实际发现可能会有实例A处理完,实例B才开始处理,导致处理多次的情况。
然后优化为将锁时间加长的方案。
而如果将锁时间加长,会导致多台实例处理是阻塞,进而优化为每个Key加锁、并延长锁时间的方案。
修改后如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Override public void onPMessage(String pattern, String channel, String message) {
if (StringUtils.startsWith(message, KEYS_LOCK)) { return; }
jedisPool = ApplicationContextUtil.getBean(JedisPool.class); jedis = jedisPool.getResource();
Long lock = jedis.setnx(KEYS_LOCK + message, "true"); jedis.expire(KEYS_LOCK + message, 5); if (lock == 0) { jedis.close(); return ; }
System.out.println(pattern + "=" + channel + "=" + message); jedis.del(KEYS_LOCK); jedis.close(); }
|
创建线程类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.springboot.common;
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool;
public class SubThread implements Runnable {
private JedisPool jedisPool;
String pattern = "*";
public SubThread(JedisPool jedisPool) { this.jedisPool = jedisPool; }
@Override public void run() { while(true) { Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe(new RedisListener(), pattern); } catch (Exception e) { e.printStackTrace(); } finally { jedis.close(); } } } }
|
创建ApplicationContextUtil类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.springboot.util;
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;
@Component public class ApplicationContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (ApplicationContextUtil.applicationContext == null) { ApplicationContextUtil.applicationContext = applicationContext; } }
public static ApplicationContext getApplicationContext() { return applicationContext; }
public static Object getBean(String name) { return ApplicationContextUtil.getApplicationContext().getBean(name); }
public static <T>T getBean(Class<T> clazz) { return ApplicationContextUtil.getApplicationContext().getBean(clazz); }
public static <T>T getBean(String name, Class<T> clazz) { return ApplicationContextUtil.getApplicationContext().getBean(name, clazz); } }
|