前言 无言。
继续复盘。
复盘 消息队列选型?kafka和rabbitmq对比?
RabbitMq
RocketMq
Kafka
开发语言
erlang
Java
Java
单机吞吐
万级
万级
十万级
延时
微秒
毫秒
毫秒
消息重复
可控制
可能会有重复
持久化
内存,文件
磁盘
事务
不支持
支持
优点
性能较好,支持amqp
阿里开源,性能非常好,在阿里内部大规模应用。支持多种模式,集群消费,广播消费等。
高吞吐量,低延时,稳定性高,消息有序
缺点
erlang语言开发,不利于扩展
阿里开源的东西,说不定什么时候社区会停止维护。
社区更新较慢,不支持延迟,重试等。
如果需要日志采集追求高吞吐量,那么采用kafka;Rabbitmq使用简单,但是不利于二次开发。Rocketmq背靠阿里,成也如此败也如此,阿里的开源贡献很大,但是很多项目稳定后社区经常陷入停滞,不过阿里内部既然在使用,那么说明它的性能和可靠性有保证。
xxljob原理 xxl-job是一个分布式的定时任务调度平台。主要分为admin和executor
xxl-job其实也是在quartz的基础上实现的,但是修改了任务调度的模式,并且任务调度采用注册和RPC调用方式来实现 。2.1.0版本前核心调度模块都是基于quartz
框架,2.1.0版本开始自研调度组件,移除quartz
依赖 ,使用时间轮调度。
xxl_job_info
表是记录定时任务 的db表,里面有个trigger_next_time(Long)
字段,表示下一次触发的时间点任务时间被修改 / 每一次任务触发后,可以根据cronb
表达式计算下一次触发时间戳:Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date()))
定时执行任务逻辑:
定时任务scheduleThread
:不断从db
把5秒
内要执行的任务读出,立即触发 / 放到时间轮等待触发 ,并更新trigger_next_time
.
获取当前时间now
轮询db
,找出trigger_next_time
在距now 5秒
内的任务,对到达now时间后的任务(超出now 5秒外)直接跳过不执行(调度过期,如果有调度过期策略则触发执行)或者重置trigger_next_time
;对到达now
时间后的任务(超出now
5秒内),开线程执行触发逻辑,若任务下一次触发时间是在5秒内,则放到时间轮内(Map<Integer, List>秒数(1-60) => 任务id列表),重置trigger_next_time
;对未到达now
时间的任务,直接放到时间轮内并重置trigger_next_time
。
定时任务ringThread
:时间轮实现到点触发任务。时间轮数据结构:Map<Integer, List<Integer>> key
是秒数(1-60)
,value
是任务id
列表。
获取当前时间秒数
从时间轮内移出当前秒数前2个秒数(避免处理耗时太长,跨过刻度,向前校验一个刻度)的任务列表id,一一触发任务;
如何避免集群中的多个服务器同时调度任务?
当xxl-job应用本身集群部署(实现高可用HA)时,通过mysql悲观锁实现分布式锁(for update语句)
setAutoCommit(false)
关闭隐式自动提交事务,启动事务
select lock for update
(显式排他锁,其他事务无法进入&无法实现for update
)
读db
任务信息 -> 拉任务到内存时间轮 -> 更新db
任务信息
commit
提交事务,同时会释放for update
的排他锁(悲观锁)
es优势 前一篇应该说过了,用作全文搜索,相较于mysql会快很多。es是document格式的存储,mysql是行格式的,所以es并不需要显式定义字段。mysql由于其索引实现(innodb为例)导致在数据量大到一定级别后会出现性能衰减;而es只要内存足够就没太大问题。插入速度上如果正确的配置mysql其性能并不低,当然相对于正常状态es而言还是差了一个到多个量级(es>mongo>mysql)。查询速度这个主要看索引和数量,在需要复杂关联查询的时候建议优先考虑mysql。资源开销上,当数据量上去了后如果为了维持性能的话,es的占用内存是十分夸张的。
去掉redis广播怎么通知各服务 从广播推模式改为存在数据库中,各个服务扫表实现。
编程题:两个线程交替打印AB。三个线程交替打印ABC。 被CSDN坑了一把。
相互唤醒 这种写法有的问题是最后会卡住,其实改造下wait的条件就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class Main { public static void main (String[] args) { Print print = new Print(10 ); new Thread(print::printA).start(); new Thread(print::printB).start(); } public static class Print { private boolean flag = false ; private int count; private int countA; private int countB; public Print (int count) { this .count = count; } public synchronized void printA () { while (!flag && countA++ < count) { this .notify(); System.out.println("A" ); flag = true ; try { this .wait(); }catch (Exception ignored) { } } } public synchronized void printB () { while (flag && countB++ < count) { this .notify(); System.out.println("B" ); flag = false ; try { this .wait(); }catch (Exception ignored) { } } } } }
lock版,本质上也是synchronized和wait/notify 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 Object lock = new Object(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { synchronized (lock) { lock.notify(); System.out.println("A" ); try { lock.wait(); }catch (Exception ignored) { } } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { synchronized (lock) { lock.notify(); System.out.println("B" ); try { lock.wait(); }catch (Exception ignored) { } } } }).start();
lock版 三线程 在网上看到这种写法,不建议,易读性十分不好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public static class PrintABCWithLock { public static void main (String[] args) throws Exception { Object lockA = new Object(); Object lockB = new Object(); Object lockC = new Object(); Thread a = new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { synchronized (lockC) { synchronized (lockA) { System.out.println("A" ); lockA.notifyAll(); } try { lockC.wait(); } catch (Exception ignored) { } } } }); Thread b = new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { synchronized (lockA) { synchronized (lockB) { System.out.println("B" ); lockB.notifyAll(); } try { lockA.wait(); } catch (Exception ignored) { } } } }); Thread c = new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { synchronized (lockB) { synchronized (lockC) { System.out.println("C" ); lockC.notifyAll(); } try { lockB.wait(); } catch (Exception ignored) { } } } }); a.start(); Thread.sleep(100 ); b.start(); Thread.sleep(100 ); c.start(); a.join(); } }
真正的锁版。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public static class PrintWithLock { private static int state = 0 ; private static final Lock lock = new ReentrantLock(); private static final Condition A = lock.newCondition(); private static final Condition B = lock.newCondition(); private static final Condition C = lock.newCondition(); public static void main (String[] args) throws Exception { new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { lock.lock(); try { while (state % 3 != 0 ) { A.await(); } System.out.println("A" ); state++; B.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { lock.lock(); try { while (state % 3 != 1 ) { B.await(); } System.out.println("B" ); state++; C.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { lock.lock(); try { while (state % 3 != 2 ) { C.await(); } System.out.println("C" ); state++; A.signal(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } } }).start(); } }
去掉不需要的condition 和atomicInteger差不多。内部都在循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public static class PrintWithLock { private static int state = 0 ; private static final Lock lock = new ReentrantLock(); public static void main (String[] args) throws Exception { new Thread(() -> { for (int i = 0 ; i < 10 ;) { lock.lock(); try { while (state % 3 == 0 ) { System.out.println("A" ); state++; i++; } } finally { lock.unlock(); } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ;) { lock.lock(); try { while (state % 3 == 1 ) { System.out.println("B" ); state++; i++; } } finally { lock.unlock(); } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ;) { lock.lock(); try { while (state % 3 == 2 ) { System.out.println("C" ); state++; i++; } } finally { lock.unlock(); } } }).start(); } }
AtomicInteger版本 比较费cpu,因为在循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static class PrintWithCas { private static final AtomicInteger state = new AtomicInteger(0 ); public static void main (String[] args) { new Thread(() -> { for (int i = 0 ; i < 10 ; ) { if (state.get() % 3 == 0 ) { System.out.println("A" ); state.compareAndSet(state.get(), state.get() + 1 ); i++; } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; ) { if (state.get() % 3 == 1 ) { System.out.println("B" ); state.compareAndSet(state.get(), state.get() + 1 ); i++; } } }).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; ) { if (state.get() % 3 == 2 ) { System.out.println("C" ); state.compareAndSet(state.get(), state.get() + 1 ); i++; } } }).start(); } }
信号量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public static class PrintWithSemaphore { private static final Semaphore A = new Semaphore(1 ); private static final Semaphore B = new Semaphore(0 ); private static final Semaphore C = new Semaphore(0 ); public static void main (String[] args) { new Thread(() -> { try { for (int i = 0 ; i < 10 ; i++) { A.acquire(); System.out.println("A" ); B.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { for (int i = 0 ; i < 10 ; i++) { B.acquire(); System.out.println("B" ); C.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { for (int i = 0 ; i < 10 ; i++) { C.acquire(); System.out.println("C" ); A.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }