今天查看文章详情的时候看到阅读数是每次刷新都实时增加的,之前也做过大量的资讯相关的业务所以对这一块一直都比较敏感。所以就想动手调整一下
Halo阅读数新增实现
通过源码我们看到Halo的阅读数是通过监听器来实现的,有一个抽象类 ***AbstractVisitEventListener***其中有两个实现
- SheetVisitEventListener
- PostVisitEventListener
实现类没什么代码就不看了,主要看AbstractVisitEventListener
public abstract class AbstractVisitEventListener {
/**
* 阅读数Map key为文章ID,value为一个堵塞队列
*/
private final Map<Integer, BlockingQueue<Integer>> visitQueueMap;
/**
* 阅读数任务map key为文章ID,value为一个线程
*/
private final Map<Integer, PostVisitTask> visitTaskMap;
/**
* db-service 接口
*/
private final BasePostService basePostService;
/**
* 线程词
*/
private final ExecutorService executor;
protected AbstractVisitEventListener(BasePostService basePostService) {
this.basePostService = basePostService;
int initCapacity = 8;
long count = basePostService.count();
if (count < initCapacity) {
initCapacity = (int) count;
}
visitQueueMap = new ConcurrentHashMap<>(initCapacity << 1);
visitTaskMap = new ConcurrentHashMap<>(initCapacity << 1);
this.executor = Executors.newCachedThreadPool();
}
/**
* Handle visit event.
*
* @param event visit event must not be null
* @throws InterruptedException
*/
protected void handleVisitEvent(@NonNull AbstractVisitEvent event) throws InterruptedException {
Assert.notNull(event, "Visit event must not be null");
// Get post id
Integer id = event.getId();
log.debug("Received a visit event, post id: [{}]", id);
/**
* 当监听到事件,如果没有队列就新增一个队列
*/
// Get post visit queue
BlockingQueue<Integer> postVisitQueue = visitQueueMap.computeIfAbsent(id, this::createEmptyQueue);
/**
* 如果文章没有线程就新增一个线程
*/
visitTaskMap.computeIfAbsent(id, this::createPostVisitTask);
// Put a visit for the post
postVisitQueue.put(id);
}
/**
* 创建线程
* @param postId
* @return
*/
private PostVisitTask createPostVisitTask(Integer postId) {
// Create new post visit task
PostVisitTask postVisitTask = new PostVisitTask(postId);
/**
* 执行线程
*/
// Start a post visit task
executor.execute(postVisitTask);
log.debug("Created a new post visit task for post id: [{}]", postId);
return postVisitTask;
}
/**
* 创建队列
* @param postId
* @return
*/
private BlockingQueue<Integer> createEmptyQueue(Integer postId) {
// Create a new queue
return new LinkedBlockingQueue<>();
}
/**
* Post visit task.
*/
private class PostVisitTask implements Runnable {
private final Integer id;
private PostVisitTask(Integer id) {
this.id = id;
}
@Override
public void run() {
/**
* 先获取线程中断状态,然后在根据参数决定是否重置中断状态,true重置,false不重置。
*/
while (!Thread.currentThread().isInterrupted()) {
try {
BlockingQueue<Integer> postVisitQueue = visitQueueMap.get(id);
/**
* 一直阻塞,直到队列不为空或者线程被中断-->阻塞
*/
Integer postId = postVisitQueue.take();
log.debug("Took a new visit for post id: [{}]", postId);
/**
* 新增阅读数
*/
// Increase the visit
basePostService.increaseVisit(postId);
log.debug("Increased visits for post id: [{}]", postId);
} catch (InterruptedException e) {
log.debug("Post visit task: " + Thread.currentThread().getName() + " was interrupted", e);
// Ignore this exception
}
}
log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
}
}
}
当监听到事件时
会先初始化队列和一个线程
Assert.notNull(event, "Visit event must not be null");
// Get post id
Integer id = event.getId();
log.debug("Received a visit event, post id: [{}]", id);
/**
* 当监听到事件,如果没有队列就新增一个队列
*/
// Get post visit queue
BlockingQueue<Integer> postVisitQueue = visitQueueMap.computeIfAbsent(id, this::createEmptyQueue);
/**
* 如果文章没有线程就新增一个线程
*/
visitTaskMap.computeIfAbsent(id, this::createPostVisitTask);
// Put a visit for the post
postVisitQueue.put(id);
线程处理
线程中一个死循环一直在跑,如果队列中有值就执行新增阅读的方法。如果没值就一直堵塞当前线程。线程池Executors.newCachedThreadPool()使用max为Integer.max的参数 也没有控制最大值
如果在范围时间内大量访问创建大量线程?对服务器开销就特别大了,特别是一些小内存(如我就开128M)特别容易被攻击
/**
* Post visit task.
*/
private class PostVisitTask implements Runnable {
private final Integer id;
private PostVisitTask(Integer id) {
this.id = id;
}
@Override
public void run() {
/**
* 先获取线程中断状态,然后在根据参数决定是否重置中断状态,true重置,false不重置。
*/
while (!Thread.currentThread().isInterrupted()) {
try {
BlockingQueue<Integer> postVisitQueue = visitQueueMap.get(id);
/**
* 一直阻塞,直到队列不为空或者线程被中断-->阻塞
*/
Integer postId = postVisitQueue.take();
log.debug("Took a new visit for post id: [{}]", postId);
/**
* 新增阅读数
*/
// Increase the visit
basePostService.increaseVisit(postId);
log.debug("Increased visits for post id: [{}]", postId);
} catch (InterruptedException e) {
log.debug("Post visit task: " + Thread.currentThread().getName() + " was interrupted", e);
// Ignore this exception
}
}
log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
}
}
重构后
public class AbstractSmoothVisitEventListener {
private final Read<Double,Integer> read;
public AbstractSmoothVisitEventListener(BasePostService basePostService){
/**
* 初始化阅读器
* so: 当文章阅读数 >= 30时会入库 or 每5分钟所有有阅读数当文章都会入库
*/
read = new LocalCacheRead<>(30D, 300, new VisitReadStorage(basePostService), "post:read");
}
/**
* Handle visit event.
*
* @param event visit event must not be null
* @throws InterruptedException
*/
protected void handleVisitEvent(@NonNull AbstractVisitEvent event) throws InterruptedException{
Assert.notNull(event, "Visit event must not be null");
// Get post id
Integer id = event.getId();
read.read(id,1D,null);
}
static class VisitReadStorage implements ReadStorage<Double,Integer>{
private BasePostService postService;
VisitReadStorage(BasePostService basePostService){
this.postService = basePostService;
}
@Override
public void increase(Integer posId, Double n) {
postService.increaseVisit(n.longValue(),posId);
log.info("新增阅读数:{},postId:{}",n,posId);
}
@Override
public void increase(Map<Integer, Double> map) {
postService.increaseListVisit(map);
log.info("定时清楚阅读数");
}
}
}
Read介绍
自定义的一个阅读数接口,用来平缓的增加阅读数
/**
* 读取
* @param key 阅读的key
* @param n 增加数
* @param clientID 客户端ID (非必填)
*/
void read(ID key,T n,String clientID);
/**
* 查询指定key的阅读数
* @param key 阅读key
* @return {@link Optional<T>}
*/
Optional<T> getRead(ID key);
/**
* 查询多个key的阅读数
* @param keys 阅读keys
* @return {@link Optional<Map<ID,T>>}
*/
Optional<Map<ID,T>> getReads(List<ID> keys);
ReadAbstract抽象类
抽取控制达到指定阅读数和定时入库的操作
public abstract class ReadAbstract<T extends Number,ID> implements Read<T,ID> {
/**
* 最大读取值,当达到此读取值时。数据入库
*/
private final T maxRead;
/**
* 定时多少秒 入库一次
*/
protected final int jobSeconds;
/**
* 阅读数存储
*/
protected final ReadStorage<T,ID> readStorage;
/**
* 是否有过读取(决定是否处理任务)
*/
private AtomicInteger isRead = new AtomicInteger(0);
/**
* 阅读文摘
*
* @param maxRead 最大阅读数
* @param jobSeconds 定时秒
* @param readStorage 读取存储
*/
protected ReadAbstract(T maxRead, int jobSeconds, ReadStorage<T, ID> readStorage) {
assert readStorage != null : "ReadStorage不能位空";
assert maxRead != null : "最大阅读数不能位空";
assert maxRead.doubleValue() > 0.01D : "最大阅读数不能小于0.01";
assert jobSeconds > 60 : "定时秒不能小于60";
this.maxRead = maxRead;
this.jobSeconds = jobSeconds;
this.readStorage = readStorage;
this.initJob();
}
/**
* 初始化job
*/
private void initJob(){
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleWithFixedDelay(this::perform,3,jobSeconds, TimeUnit.SECONDS);
}
/**
* 执行
*
*/
protected void perform(){
if (isRead.updateAndGet(r -> {
return r > 0 ? -1 : 0;
}) == -1) {
this.getAll().ifPresent(data -> {
readStorage.increase(data);
this.clear();
});
}
}
/**
* 读取
*
* @param key 阅读的key
* @param n 增加数
* @param clientID 客户端ID (非必填)
*/
@Override
public void read(ID key, T n, String clientID) {
//TODO if clientID not null ?
isRead.updateAndGet(r -> {
return r == -1 ? 1 : (r + 1);
});
this.increase(key, n).ifPresent(d -> {
/**
* 达到最大数条件
*/
if(d.doubleValue() >= maxRead.doubleValue()){
readStorage.increase(key,d);
/**
* 缓存中马上减少对应数
*/
this.reduce(key,d);
}
});
}
/**
* 增加
*
* @param key 关键
* @param n 阅读数
* @return {@link T} 返回阅读数
*/
abstract Optional<T> increase(ID key, T n);
/**
* 减少
*
* @param key 关键
* @param n 阅读数
*/
abstract void reduce(ID key,T n);
/**
* 得到所有
*
* @return {@link Map<ID, T>}
*/
abstract Optional<Map<ID,T>> getAll();
/**
* 清除
*/
abstract void clear();
LocalCacheRead本地缓存阅读
public class LocalCacheRead<ID> extends ReadAbstract<Double,ID>{
private ReadCache cache;
/**
* 阅读文摘
*
* @param maxRead 最大阅读数
* @param jobSeconds 定时秒
* @param readStorage 读取存储
*/
public LocalCacheRead(Double maxRead, int jobSeconds, ReadStorage<Double, ID> readStorage,String cacheKey) {
super(maxRead, jobSeconds, readStorage);
cache = new ReadCache(cacheKey);
}
/**
* 增加
*
* @param key 关键
* @param n 阅读数
* @return {@link Double} 返回阅读数
*/
@Override
Optional<Double> increase(ID key, Double n) {
Double aDouble = cache.get(key, Double.class);
if(aDouble == null){
aDouble = 0D;
}
cache.put(key,aDouble + n);
return Optional.ofNullable(aDouble + n);
}
/**
* 减少
*
* @param key 关键
* @param n 阅读数
*/
@Override
void reduce(ID key, Double n) {
Double aDouble = cache.get(key, Double.class);
if(aDouble != null){
if(aDouble <= n){
cache.evict(key);
return;
}
cache.put(key,aDouble - n);
}
}
/**
* 得到所有
*
* @return {@link Map <ID, T>}
*/
@Override
Optional<Map<ID, Double>> getAll() {
return Optional.ofNullable((Map) cache.getNativeCache());
}
/**
* 清除
*/
@Override
void clear() {
cache.clear();
}
/**
* 查询指定key的阅读数
*
* @param key 阅读key
* @return {@link Optional<Double>}
*/
@Override
public Optional<Double> getRead(ID key) {
return Optional.ofNullable(cache.get(key, Double.class));
}
/**
* 查询多个key的阅读数
*
* @param keys 阅读keys
* @return {@link Optional<Map<ID,Double>>}
*/
@Override
public Optional<Map<ID, Double>> getReads(List<ID> keys) {
return Optional.ofNullable((Map)cache.getNativeCache().entrySet().stream().filter(v->keys.contains(v.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
/**
* 阅读室缓存
*/
static class ReadCache extends ConcurrentMapCache{
/**
* Create a new ConcurrentMapCache with the specified name.
*
* @param name the name of the cache
*/
public ReadCache(String name) {
super(name,false);
}
}
}
RedisCacheRead redis缓存阅读
redis缓存有点特殊,使用redis缓存一般都是分布式项目,所以它定时入库的操作是需要加上分布式锁来控制不同节点同时触发
RedisCacheRead<ID> extends ReadAbstract<Double,ID> {
/**
* redis操作类
*/
private final RedisOperations<String,Double> redisOperations;
/**
* redis hash操作
*/
private final BoundHashOperations<String, ID, Double> hash;
/**
* 缓存前缀
*/
private final String cacheKey;
/**
* 搜索key拼接符
*/
private final String SCAN_KEY_PREFIX = "^";
private final String SCAN_KEY_SUFFIX = "$";
private final String SCAN_KEY_JOIN = "|";
private final String value = "0";
/**
* 阅读文摘
* @param maxRead 最大阅读数
* @param jobSeconds 定时秒
* @param readStorage 读取存储
* @param redisOperations redis操作类
*/
protected RedisCacheRead(double maxRead, int jobSeconds, ReadStorage<Double, ID> readStorage, RedisOperations<String, Double> redisOperations) {
this(maxRead,jobSeconds,readStorage,redisOperations,"read:cache:");
}
/**
* 阅读文摘
* @param maxRead 最大阅读数
* @param jobSeconds 定时秒
* @param readStorage 读取存储
* @param redisOperations redis操作类
* @param cacheKey 缓存key
*/
protected RedisCacheRead(double maxRead, int jobSeconds, ReadStorage<Double, ID> readStorage, RedisOperations<String, Double> redisOperations,String cacheKey) {
super(maxRead, jobSeconds, readStorage);
assert readStorage != null : "redis操作类不能为空";
this.redisOperations = redisOperations;
this.cacheKey = cacheKey;
this.hash = redisOperations.boundHashOps(this.cacheKey);
}
/**
* 执行(因redis可能为分布式部署,添加redis锁)
*/
@Override
protected void perform() {
if(this.getLock(cacheKey+":perform_job", (jobSeconds / 2) < 5 ? 5 : (jobSeconds / 2) ,TimeUnit.SECONDS)){
super.perform();
/**
* 释放锁
*/
this.unLock(cacheKey+":perform_job");
}
}
/**
* 增加
*
* @param key 关键
* @param n 阅读数
* @return {@link Double} 返回阅读数
*/
@Override
Optional<Double> increase(ID key, Double n) {
return Optional.ofNullable(hash.increment(key,n));
}
/**
* 减少
*
* @param key 关键
* @param n 阅读数
*/
@Override
void reduce(ID key, Double n) {
hash.increment(key,-n);
}
/**
* 得到所有
*
* @return {@link Map <ID, T>}
*/
@Override
Optional<Map<ID,Double>> getAll() {
return Optional.ofNullable(hash.entries());
}
/**
* 清除
*/
@Override
void clear() {
redisOperations.delete(cacheKey);
}
/**
* 查询指定key的阅读数
*
* @param key 阅读key
* @return {@link Optional<Double>}
*/
@Override
public Optional<Double> getRead(ID key) {
return Optional.ofNullable(hash.get(key));
}
/**
* 查询多个key的阅读数
*
* @param keys 阅读keys
* @return {@link Optional<Map<ID,Double>>}
*/
@Override
public Optional<Map<ID, Double>> getReads(List<ID> keys) {
/**
* 搜索key
*/
StringBuilder sb = new StringBuilder();
/**
* 拼接成正则
*/
keys.forEach(v->{
sb.append(SCAN_KEY_PREFIX).append(v.toString()).append(SCAN_KEY_SUFFIX);
if(keys.indexOf(v) < (keys.size() -1)){
sb.append(SCAN_KEY_JOIN);
}
});
/**
* 搜索
*/
Cursor<Map.Entry<ID, Double>> cursor = hash.scan(new ScanOptions.ScanOptionsBuilder().match(sb.toString()).count(keys.size()).build());
Map<ID, Double> map = new HashMap<>(keys.size());
while (cursor.hasNext()) {
Map.Entry<ID, Double> next = cursor.next();
map.put(next.getKey(), next.getValue());
}
return Optional.ofNullable(map);
}
/**
* 获取锁
* @param key
* @param timeout
* @param timeUnit
* @return
*/
private boolean getLock(String key, long timeout, TimeUnit timeUnit) {
try {
return redisOperations.execute((RedisCallback< Boolean>) connection ->
connection.set(key.getBytes(Charset.forName("UTF-8")), value.getBytes(Charset.forName("UTF-8")),
Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
} catch (Exception e) {
return false;
}
}
/**
* 释放锁
* @param key
*/
private void unLock(String key) {
try {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Boolean unLockStat = redisOperations.execute((RedisCallback<Boolean>)connection ->
connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,
key.getBytes(Charset.forName("UTF-8")), value.getBytes(Charset.forName("UTF-8"))));
if (!unLockStat) {
}
} catch (Exception e) {
}
}
}
评论区