Spring Cache是一个强大的缓存抽象层,提供了统一的缓存操作接口,但原生支持主要集中在单键操作。在高并发场景下,批量操作能力对于提升系统性能至关重要。本文将深入探讨如何通过继承Cache接口,以RedisCache为基础,实现兼容Spring Cache规范的BatchCache扩展。
一、Spring Cache架构与RedisCache源码剖析
1.1 Spring Cache核心组件
Spring Cache的核心是
org.springframework.cache.Cache
接口,定义了缓存的基本操作:public interface Cache { String getName(); Object getNativeCache(); ValueWrapper get(Object key); <T> T get(Object key, Class<T> type); <T> T get(Object key, Callable<T> valueLoader); void put(Object key, Object value); ValueWrapper putIfAbsent(Object key, Object value); void evict(Object key); void clear(); }
Java
主要实现类包括:
- ConcurrentMapCache:基于ConcurrentHashMap的简单实现
- RedisCache:Redis集成实现
- CaffeineCache:Caffeine缓存实现
1.2 RedisCache源码深入分析
RedisCache是Spring Data Redis提供的缓存实现,核心类结构如下:
public class RedisCache extends AbstractValueAdaptingCache { private final String name; private final RedisCacheWriter cacheWriter; private final RedisCacheConfiguration cacheConfig; // 构造函数和核心方法实现 }
Java
核心组件解析:
- RedisCacheWriter:负责与Redis通信的底层接口
- RedisCacheConfiguration:缓存配置,如序列化器、TTL等
- AbstractValueAdaptingCache:提供缓存值处理的基础实现
1.3 RedisCacheWriter源码分析
RedisCacheWriter是Redis操作的核心接口:
public interface RedisCacheWriter { void put(String name, byte[] key, byte[] value, @Nullable Duration ttl); byte[] get(String name, byte[] key); byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Duration ttl); void remove(String name, byte[] key); void clean(String name, byte[] pattern); }
Java
主要实现类:
- DefaultRedisCacheWriter:基于RedisTemplate的默认实现
- LettuceRedisCacheWriter:基于Lettuce客户端的优化实现
- JedisRedisCacheWriter:基于Jedis客户端的实现
二、BatchCache接口设计与实现思路
2.1 BatchCache接口定义
为了实现批量操作能力,我们需要定义一个扩展接口:
/** * 扩展Spring Cache接口,提供批量操作能力 * * @author doubao */ public interface BatchCache extends Cache { /** * 批量获取缓存值 * * @param keys 缓存键集合 * @return 键值对映射,不存在的键对应的值为null */ Map<Object, Object> getAll(Collection<?> keys); /** * 批量存入缓存值 * * @param values 键值对映射 */ void putAll(Map<?, ?> values); /** * 批量存入缓存值,仅当键不存在时 * * @param values 键值对映射 * @return 实际存入的键值对映射,已存在的键对应的值为null */ Map<Object, Object> putAllIfAbsent(Map<?, ?> values); /** * 批量删除缓存 * * @param keys 缓存键集合 */ void evictAll(Collection<?> keys); }
Java
2.2 实现思路概述
实现BatchCache接口的核心思路:
- 继承RedisCache类,复用现有功能
- 扩展RedisCacheWriter接口,添加批量操作方法
- 实现BatchRedisCache类,实现BatchCache接口
- 提供配置类,注册自定义CacheManager
- 确保与Spring Cache生态系统兼容
三、核心代码实现
3.1 扩展RedisCacheWriter接口
/** * 扩展RedisCacheWriter接口,添加批量操作方法 * * @author doubao */ public interface BatchRedisCacheWriter extends RedisCacheWriter { /** * 批量获取缓存值 * * @param name 缓存名称 * @param keys 缓存键集合(字节数组形式) * @return 键值对映射,不存在的键对应的值为null */ Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys); /** * 批量存入缓存值 * * @param name 缓存名称 * @param values 键值对映射(字节数组形式) * @param ttl 过期时间,null表示不过期 */ void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl); /** * 批量删除缓存 * * @param name 缓存名称 * @param keys 缓存键集合(字节数组形式) */ void removeAll(String name, Collection<byte[]> keys); }
Java
3.2 实现BatchRedisCacheWriter
/** * RedisCacheWriter的批量操作实现 * * @author doubao */ public class DefaultBatchRedisCacheWriter implements BatchRedisCacheWriter { private final RedisTemplate<byte[], byte[]> redisTemplate; private final Duration sleepTime; /** * 构造函数 * * @param redisOperations Redis操作模板 */ public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations) { this(redisOperations, Duration.ZERO); } /** * 构造函数 * * @param redisOperations Redis操作模板 * @param sleepTime 重试间隔时间 */ public DefaultBatchRedisCacheWriter(RedisOperations<byte[], byte[]> redisOperations, Duration sleepTime) { Assert.notNull(redisOperations, "RedisOperations must not be null!"); Assert.notNull(sleepTime, "SleepTime must not be null!"); this.redisTemplate = (RedisTemplate<byte[], byte[]>) redisOperations; this.sleepTime = sleepTime; } @Override public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) { execute(name, connection -> { if (shouldExpireWithin(ttl)) { connection.setEx(key, ttl.getSeconds(), value); } else { connection.set(key, value); } return "OK"; }); } @Override public byte[] get(String name, byte[] key) { return execute(name, connection -> connection.get(key)); } // 其他方法实现... @Override public Map<byte[], byte[]> getAll(String name, Collection<byte[]> keys) { return execute(name, connection -> { List<byte[]> values = connection.mGet(keys.toArray(new byte[0][])); Map<byte[], byte[]> result = new LinkedHashMap<>(keys.size()); int index = 0; for (byte[] key : keys) { result.put(key, index < values.size() ? values.get(index) : null); index++; } return result; }); } @Override public void putAll(String name, Map<byte[], byte[]> values, @Nullable Duration ttl) { execute(name, connection -> { if (shouldExpireWithin(ttl)) { Pipeline pipeline = connection.openPipeline(); for (Map.Entry<byte[], byte[]> entry : values.entrySet()) { pipeline.setEx(entry.getKey(), ttl.getSeconds(), entry.getValue()); } pipeline.close(); } else { Map<byte[], byte[]> nonNullValues = values.entrySet().stream() .filter(e -> e.getValue() != null) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (!nonNullValues.isEmpty()) { connection.mSet(nonNullValues); } } return "OK"; }); } @Override public void removeAll(String name, Collection<byte[]> keys) { execute(name, connection -> { if (!keys.isEmpty()) { connection.del(keys.toArray(new byte[0][])); } return "OK"; }); } // 辅助方法... private <T> T execute(String name, RedisCallback<T> callback) { try { return redisTemplate.execute(callback); } catch (Exception ex) { throw new CacheOperationFailedException(name, "Redis batch operation failed", ex); } } private boolean shouldExpireWithin(@Nullable Duration ttl) { return ttl != null && !ttl.isZero() && !ttl.isNegative(); } }
Java
3.3 实现BatchRedisCache类
/** * 支持批量操作的Redis缓存实现 * * @author doubao */ public class BatchRedisCache extends RedisCache implements BatchCache { private final BatchRedisCacheWriter cacheWriter; private final RedisSerializationContext<Object, Object> serializationContext; /** * 构造函数 * * @param name 缓存名称 * @param cacheWriter 缓存写入器 * @param cacheConfig 缓存配置 */ protected BatchRedisCache(String name, BatchRedisCacheWriter cacheWriter, RedisCacheConfiguration cacheConfig) { super(name, cacheWriter, cacheConfig); this.cacheWriter = cacheWriter; this.serializationContext = cacheConfig.getSerializationContext(); } @Override public Map<Object, Object> getAll(Collection<?> keys) { // 转换键为字节数组 Map<Object, byte[]> keyMappings = new LinkedHashMap<>(keys.size()); for (Object key : keys) { keyMappings.put(key, serializeCacheKey(createCacheKey(key))); } // 批量获取缓存值 Map<byte[], byte[]> results = cacheWriter.getAll( getName(), keyMappings.values()); // 反序列化结果 Map<Object, Object> valueMappings = new LinkedHashMap<>(results.size()); for (Map.Entry<Object, byte[]> entry : keyMappings.entrySet()) { byte[] valueBytes = results.get(entry.getValue()); valueMappings.put(entry.getKey(), deserializeCacheValue(valueBytes)); } return valueMappings; } @Override public void putAll(Map<?, ?> values) { // 序列化键值对 Map<byte[], byte[]> serializedValues = new LinkedHashMap<>(values.size()); for (Map.Entry<?, ?> entry : values.entrySet()) { if (entry.getValue() != null) { String cacheKey = createCacheKey(entry.getKey()); byte[] keyBytes = serializeCacheKey(cacheKey); byte[] valueBytes = serializeCacheValue(entry.getValue()); serializedValues.put(keyBytes, valueBytes); } } // 批量存入缓存 cacheWriter.putAll(getName(), serializedValues, getTtl()); } @Override public Map<Object, Object> putAllIfAbsent(Map<?, ?> values) { // 实现略,需要使用Redis事务或Lua脚本确保原子性 throw new UnsupportedOperationException("Batch putIfAbsent operation is not supported yet."); } @Override public void evictAll(Collection<?> keys) { // 转换键为字节数组 Collection<byte[]> keyBytes = keys.stream() .map(key -> serializeCacheKey(createCacheKey(key))) .collect(Collectors.toList()); // 批量删除缓存 cacheWriter.removeAll(getName(), keyBytes); } // 辅助方法... private byte[] serializeCacheKey(String cacheKey) { return serializationContext.getKeySerializationPair().write(cacheKey); } private byte[] serializeCacheValue(Object value) { return serializationContext.getValueSerializationPair().write(value); } private Object deserializeCacheValue(byte[] valueBytes) { if (valueBytes == null) { return null; } return serializationContext.getValueSerializationPair().read(valueBytes); } private String createCacheKey(Object key) { String convertedKey = convertKey(key); if (!getCacheConfiguration().usePrefix()) { return convertedKey; } return getCacheConfiguration().getKeyPrefixFor(name) + convertedKey; } }
Java
3.4 实现BatchRedisCacheManager
/** * 支持批量操作的Redis缓存管理器 * * @author doubao */ public class BatchRedisCacheManager extends RedisCacheManager { private final BatchRedisCacheWriter cacheWriter; private final RedisCacheConfiguration defaultCacheConfig; /** * 构造函数 * * @param cacheWriter 缓存写入器 * @param defaultCacheConfiguration 默认缓存配置 */ public BatchRedisCacheManager(BatchRedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) { super(cacheWriter, defaultCacheConfiguration); this.cacheWriter = cacheWriter; this.defaultCacheConfig = defaultCacheConfiguration; } @Override protected RedisCache createRedisCache(String name, @Nullable RedisCacheConfiguration cacheConfig) { return new BatchRedisCache(name, cacheWriter, cacheConfig != null ? cacheConfig : defaultCacheConfig); } /** * 从RedisCacheManager转换为BatchRedisCacheManager * * @param cacheManager Redis缓存管理器 * @return 支持批量操作的Redis缓存管理器 */ public static BatchRedisCacheManager fromRedisCacheManager(RedisCacheManager cacheManager) { // 获取RedisCacheManager的私有字段 Field cacheWriterField; Field defaultCacheConfigField; try { cacheWriterField = RedisCacheManager.class.getDeclaredField("cacheWriter"); defaultCacheConfigField = RedisCacheManager.class.getDeclaredField("defaultCacheConfig"); cacheWriterField.setAccessible(true); defaultCacheConfigField.setAccessible(true); RedisCacheWriter cacheWriter = (RedisCacheWriter) cacheWriterField.get(cacheManager); RedisCacheConfiguration defaultCacheConfig = (RedisCacheConfiguration) defaultCacheConfigField.get(cacheManager); // 创建BatchRedisCacheWriter BatchRedisCacheWriter batchCacheWriter; if (cacheWriter instanceof DefaultRedisCacheWriter) { DefaultRedisCacheWriter defaultWriter = (DefaultRedisCacheWriter) cacheWriter; // 使用反射获取RedisOperations Field redisOperationsField = DefaultRedisCacheWriter.class.getDeclaredField("redisOperations"); redisOperationsField.setAccessible(true); RedisOperations<byte[], byte[]> redisOperations = (RedisOperations<byte[], byte[]>) redisOperationsField.get(defaultWriter); batchCacheWriter = new DefaultBatchRedisCacheWriter(redisOperations); } else { // 回退方案,使用RedisTemplate创建 RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(cacheManager.getCacheWriter().getConnectionFactory()); redisTemplate.afterPropertiesSet(); batchCacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate); } // 创建BatchRedisCacheManager return new BatchRedisCacheManager(batchCacheWriter, defaultCacheConfig); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException("Failed to convert RedisCacheManager to BatchRedisCacheManager", e); } } }
Java
3.5 配置类实现
/** * 批量缓存配置类 * * @author doubao */ @Configuration public class BatchCacheConfiguration { @Bean public BatchRedisCacheManager batchRedisCacheManager(RedisConnectionFactory redisConnectionFactory) { // 创建默认配置 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofMinutes(10)) .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer())); // 创建BatchRedisCacheWriter RedisTemplate<byte[], byte[]> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.afterPropertiesSet(); BatchRedisCacheWriter cacheWriter = new DefaultBatchRedisCacheWriter(redisTemplate); // 创建BatchRedisCacheManager BatchRedisCacheManager cacheManager = new BatchRedisCacheManager(cacheWriter, config); cacheManager.setTransactionAware(true); // 设置缓存名称和配置的映射 Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>(); // 可以为不同的缓存名称配置不同的策略 cacheConfigurations.put("batchCache", config.entryTtl(Duration.ofHours(1))); cacheManager.setCacheConfigurations(cacheConfigurations); return cacheManager; } /** * 自定义CacheResolver,支持BatchCache * * @param cacheManager 缓存管理器 * @return 缓存解析器 */ @Bean public CacheResolver batchCacheResolver(BatchRedisCacheManager cacheManager) { return new SimpleCacheResolver(cacheManager) { @Override protected Collection<? extends Cache> getCaches(CacheOperationInvocationContext<?> context) { Collection<? extends Cache> caches = super.getCaches(context); return caches.stream() .map(cache -> { if (cache instanceof RedisCache && !(cache instanceof BatchRedisCache)) { // 将普通RedisCache转换为BatchRedisCache RedisCache redisCache = (RedisCache) cache; return new BatchRedisCache( redisCache.getName(), (BatchRedisCacheWriter) cacheManager.getCacheWriter(), redisCache.getCacheConfiguration()); } return cache; }) .collect(Collectors.toList()); } }; } }
Java
四、使用示例
4.1 定义业务服务
/** * 示例服务类,演示BatchCache的使用 * * @author doubao */ @Service public class UserService { private final UserRepository userRepository; @Autowired public UserService(UserRepository userRepository) { this.userRepository = userRepository; } /** * 批量获取用户信息,使用缓存 * * @param userIds 用户ID集合 * @return 用户信息映射 */ @Cacheable(value = "users", key = "#root.methodName + '_' + #userIds", unless = "#result == null") public Map<Long, User> getUsersBatch(Collection<Long> userIds) { // 模拟从数据库获取数据 return userIds.stream() .collect(Collectors.toMap( Function.identity(), userId -> userRepository.findById(userId).orElse(null) )); } /** * 批量保存用户信息,并更新缓存 * * @param users 用户信息集合 */ @CachePut(value = "users", key = "#root.methodName + '_' + #users.![id]", condition = "#users != null && !#users.isEmpty()") public Map<Long, User> saveUsersBatch(Collection<User> users) { // 模拟批量保存到数据库 users.forEach(userRepository::save); // 返回保存后的用户信息 return users.stream() .collect(Collectors.toMap(User::getId, Function.identity())); } /** * 批量删除用户信息,并清除缓存 * * @param userIds 用户ID集合 */ @CacheEvict(value = "users", allEntries = false, key = "#root.methodName + '_' + #userIds") public void deleteUsersBatch(Collection<Long> userIds) { // 模拟批量删除 userIds.forEach(userRepository::deleteById); } /** * 直接使用BatchCache接口的批量操作 * * @param userIds 用户ID集合 * @return 用户信息映射 */ public Map<Long, User> getUsersBatchWithBatchCache(Collection<Long> userIds) { // 通过ApplicationContext获取BatchCache BatchCache batchCache = (BatchCache) applicationContext.getBean("cacheManager").getCache("users"); // 直接使用BatchCache的批量获取方法 Map<Object, Object> cacheResults = batchCache.getAll(userIds); // 处理缓存结果 Map<Long, User> result = new HashMap<>(); for (Map.Entry<Object, Object> entry : cacheResults.entrySet()) { Long userId = (Long) entry.getKey(); User user = (User) entry.getValue(); if (user == null) { // 缓存未命中,从数据库获取 user = userRepository.findById(userId).orElse(null); if (user != null) { // 手动放入缓存 batchCache.put(userId, user); } } result.put(userId, user); } return result; } }
Java
4.2 配置文件示例
spring: redis: host: localhost port: 6379 password: timeout: 10000ms lettuce: pool: max-active: 8 max-wait: -1ms max-idle: 8 min-idle: 0 cache: type: redis redis: time-to-live: 600000 # 10分钟 cache-null-values: true use-key-prefix: true key-prefix: batch_cache:
YAML
五、性能测试与优化
5.1 性能测试框架
/** * BatchCache性能测试 * * @author doubao */ @SpringBootTest public class BatchCachePerformanceTest { @Autowired private UserService userService; @Autowired private CacheManager cacheManager; private static final int TEST_SIZE = 1000; private static final int WARMUP_TIMES = 10; private static final int TEST_TIMES = 100; @BeforeEach public void setUp() { // 准备测试数据 List<User> users = new ArrayList<>(TEST_SIZE); for (int i = 0; i < TEST_SIZE; i++) { User user = new User(); user.setId((long) i); user.setName("User" + i); user.setAge(20 + i % 30); users.add(user); } // 预热 for (int i = 0; i < WARMUP_TIMES; i++) { userService.saveUsersBatch(users); userService.getUsersBatch(users.stream().map(User::getId).collect(Collectors.toList())); } // 清除缓存 Cache usersCache = cacheManager.getCache("users"); if (usersCache != null) { usersCache.clear(); } } @Test public void testBatchGetPerformance() { List<Long> userIds = IntStream.range(0, TEST_SIZE) .mapToObj(Long::valueOf) .collect(Collectors.toList()); // 测试单键获取性能 long singleStartTime = System.currentTimeMillis(); for (int i = 0; i < TEST_TIMES; i++) { for (Long userId : userIds) { userService.getUser(userId); } } long singleEndTime = System.currentTimeMillis(); long singleTotalTime = singleEndTime - singleStartTime; // 测试批量获取性能 long batchStartTime = System.currentTimeMillis(); for (int i = 0; i < TEST_TIMES; i++) { userService.getUsersBatch(userIds); } long batchEndTime = System.currentTimeMillis(); long batchTotalTime = batchEndTime - batchStartTime; // 输出性能结果 System.out.printf("单键获取 %d 次,总耗时: %d ms,平均每次: %f ms%n", TEST_SIZE * TEST_TIMES, singleTotalTime, (double) singleTotalTime / (TEST_SIZE * TEST_TIMES)); System.out.printf("批量获取 %d 次,总耗时: %d ms,平均每次: %f ms%n", TEST_TIMES, batchTotalTime, (double) batchTotalTime / TEST_TIMES); System.out.printf("批量操作性能提升: %.2f%%%n", (1 - (double) batchTotalTime / singleTotalTime) * 100); } // 其他性能测试方法... }
Java
5.2 性能优化策略
- 批量操作优化:
- 使用Redis的MGET、MSET等批量命令
- 合理设置批量操作的大小,避免单次操作过大
- 考虑使用Redis Pipeline提升性能
- 序列化优化:
- 使用高效的序列化方式,如Kryo、Protostuff等
- 避免序列化大对象,可考虑拆分数据
- 缓存配置优化:
- 根据业务场景设置合理的TTL
- 使用分区缓存,避免不同业务数据相互影响
- 考虑使用二级缓存(如Caffeine + Redis)提升性能
六、注意事项与最佳实践
6.1 使用注意事项
- 事务支持:
- Spring Cache的@Cacheable、@CachePut等注解不支持事务回滚
- 如果需要事务支持,建议在业务代码中直接使用BatchCache接口
- 异常处理:
- 批量操作可能部分成功部分失败,需要业务层处理这种情况
- 考虑实现重试机制,确保操作的最终一致性
- 缓存穿透与雪崩:
- 批量操作同样需要防范缓存穿透问题
- 合理设置不同数据的TTL,避免缓存雪崩
6.2 最佳实践
- 批量操作大小控制:
- 对于大量数据的批量操作,建议分批处理
- 每批大小可根据网络情况和Redis性能调整,一般建议在100-1000之间
- 缓存预热:
- 对于热点数据,启动时进行缓存预热
- 使用BatchCache的批量操作能力快速填充缓存
- 监控与告警:
- 监控批量操作的性能指标,如QPS、响应时间等
- 设置合理的告警阈值,及时发现性能问题
通过以上方案,我们成功地扩展了Spring Boot Cache的功能,实现了兼容RedisCache的BatchCache接口。这种实现方式不仅保持了与Spring Cache生态的兼容性,还显著提升了批量数据操作的性能,为高并发场景下的应用提供了有力支持。
- 作者:Honesty
- 链接:https://blog.hehouhui.cn/archives/2370c7d0-9e17-801d-9570-f0f96f677f03
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。