同步操作将从 flatfish/Java-Review 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
源码+官方文档
进程:一个程序。QQ.exe 程序的集合
线程:进程执行的一个更小的单位
一个进程往往可以包含多个线程,至少包含一个
Java默认有个线程,main线程和GC线程
线程:来了一个进程Typora,我可以写字,然后过了一会可以自动保存(线程负责)
Java无法直接操作硬件,因为她是运行在JVM上的,其实是通过C/C++调用的硬件
并发和并行
并发:多个人抢一个资源
并行:多个人一起走
public class Test {
public static void main(String[] args) {
// 获取 CPU 的核数
// CPU密集型 IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并发编程的本质:充分利用CPU的资源
Java线程有几个状态
创建 、就绪、运行、阻塞、死亡
public enum State {
// 新生
NEW,
// 运行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待
WAITING,
// 超时等待
TIMED_WAITING,
// 挂机,终止
TERMINATED;
}
wait/sleep的区别
传统的锁:synchronized
真正的多线程开发,线程就是一个单独的资源类,没有任何的附属操作
其中包含属性和方法
Lock接口
**Lock的三个实现类 **
传统的方式
public class SaleTicketDemo01 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 70; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
ticket.sale();
}
}, "C").start();
}
}
class Ticket {
private int number = 50;
// synchronized 本质就是锁
public synchronized void sale() {
if (number > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " sale " + number-- + " other " + number);
}
}
}
使用Lock锁
public class SaleTicketDemo01 {
public static void main(String[] args) {
Ticket2 ticket2 = new Ticket2();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticket2.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 70; i++) {
ticket2.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
ticket2.sale();
}
}, "C").start();
}
}
class Ticket2 {
private int number = 50;
private Lock lock = new ReentrantLock();
public void sale() {
lock.lock();
try {
if (number > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " sale " + number-- + " other " + number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
公平锁:十分公平,可以先来后到
非公平锁:不公平,要抢,可以插队(默认)
synchronized和Lock的区别
锁是什么?如果判断锁的是谁?
生产者消费者问题 synchronized 版本
/**
* 线程之间的通信问题:生产者和消费者问题
* <p>
* 通知,等待唤醒
* <p>
* 线程交替执行 A B 操作同一个变量 num = 0
* A num + 1
* B num - 1
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
/**
* 口诀:
* 等待 - 业务 - 通知
*/
class Data {
private int num = 0;
// +1
public synchronized void increment() throws InterruptedException {
if (num != 0) {
// 等待操作
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName() + " 进入 increment => " + num);
// 通知 +1 已经完成
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
if (num == 0) {
// 等待操作
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName() + " 进入 decrement => " + num);
// 通知 -1 已经完成
this.notifyAll();
}
}
问题存在 A B C D 四个线程 就会出现虚假唤醒的行为
此时的代码应该使用 while 替换 if
/**
* 线程之间的通信问题:生产者和消费者问题
*
* 通知,等待唤醒
*
* 线程交替执行 A B 操作同一个变量 num = 0
* A num + 1
* B num - 1
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
/**
* 口诀:
* 等待 - 业务 - 通知
*/
class Data {
private int num = 0;
// +1
public synchronized void increment() throws InterruptedException {
while (num != 0) {
// 等待操作
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName() + " 进入 increment => " + num);
// 通知 +1 已经完成
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
while (num == 0) {
// 等待操作
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName() + " 进入 decrement => " + num);
// 通知 -1 已经完成
this.notifyAll();
}
}
Juc版本的生产者和消费者的问题
代码实现
public class B {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
/**
* 口诀:
* 等待 - 业务 - 通知
*/
class Data2 {
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
while (num != 0) {
// 等待操作
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + " 进入 increment => " + num);
// 通知 +1 已经完成
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (num == 0) {
// 等待操作
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + " 进入 decrement => " + num);
// 通知 -1 已经完成
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
任何一个新的技术,绝对不是仅仅覆盖了原来的技术,优势和补充
Condition 精准的通知和唤醒 上述的代码没有实现精准通知和唤醒,它是无序的
编写一段代码,顺序打印十次 A B C
/**
* A 执行完调用 B B 执行完调用C
*/
public class C {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data3.printA();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 13; i++) {
try {
data3.printB();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 12; i++) {
try {
data3.printC();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
}
}
class Data3 {
private Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
private static int index = 0;
public synchronized void printA() throws InterruptedException {
while (index != 0) {
this.wait();
}
System.out.println("Data3.printA");
index = 1;
this.notifyAll();
}
public synchronized void printB() throws InterruptedException {
while (index != 1) {
this.wait();
}
System.out.println("Data3.printB");
index = 2;
this.notifyAll();
}
public synchronized void printC() throws InterruptedException {
while (index != 2) {
this.wait();
}
System.out.println("Data3.printC");
index = 0;
this.notifyAll();
}
}
代码测试:
public class D {
public static void main(String[] args) {
Data4 data4 = new Data4();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data4.printA();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data4.printB();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data4.printC();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
}
}
class Data4 {
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
// 1 A 2B 3C
private int num = 1;
public void printA() throws InterruptedException {
lock.lock();
try {
// 业务代码 判断,执行 通知
while (num != 1) {
condition1.await();
}
System.out.println("AAAAA");
num = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() throws InterruptedException {
lock.lock();
try {
// 业务代码 判断,执行 通知
while (num != 2) {
condition2.await();
}
System.out.println("BBBBB");
num = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() throws InterruptedException {
lock.lock();
try {
// 业务代码 判断,执行 通知
while (num != 3) {
condition3.await();
}
System.out.println("CCCCC");
num = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
如何判断锁的是谁。永远的知道什么锁,锁到底是谁的锁!对象,Class
/**
* 8锁,就是关于锁的8个问题
* 1、标准情况下 两个线程先打印 发短信还是打电话 1/发短信 2/打电话
* 2、sendMessage 延迟4秒 两个线程先打印 发短信还是打电话 1/发短信 2/打电话
*/
public class Test1 {
public static void main(String[] args) throws Exception {
Phone phone = new Phone();
// 锁的存在
new Thread(() -> {
phone.sendMessage();
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.callMessage();
}, "B").start();
}
}
class Phone {
// synchronized 锁的对象是方法的调用者
// 两个方法用的是同一把锁,谁先拿到谁先执行
// 而且synchronized 是独占锁
public synchronized void sendMessage() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Phone.sendMessage");
}
public synchronized void callMessage() {
System.out.println("Phone.callMessage");
}
}
/**
* 3、增加了一个普通方法 先打印sendMessage还是hello 1/sendMessage 2/hello
* 4、两个对象,两个同步方法,发短信还是电话 1/callMessage 2/sendMessage 不同的锁,对象不一样,就看执行的时间
*/
public class Test2 {
public static void main(String[] args) throws Exception {
// 两个对象
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
// 锁的存在
new Thread(() -> {
phone1.sendMessage();
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.callMessage();
}, "B").start();
}
}
class Phone2 {
// synchronized 锁的对象是方法的调用者
// 两个方法用的是同一把锁,谁先拿到谁先执行
// 而且synchronized 是独占锁
public synchronized void sendMessage() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Phone.sendMessage");
}
public synchronized void callMessage() {
System.out.println("Phone.callMessage");
}
// 这里没有锁,不受锁的影响
public void hello() {
System.out.println("Phone2.hello");
}
}
/**
* 5、增加两个静态同步方法 只有一个对象 1/发短信 2/打电话
* 6、增加两个静态同步方法 2个对象 1/发短信 2/打电话
*/
public class Test3 {
public static void main(String[] args) throws Exception {
// 两个对象
Phone3 phone1 = new Phone3();
Phone3 phone2 = new Phone3();
// 锁的存在
new Thread(() -> {
phone1.sendMessage();
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.callMessage();
}, "B").start();
}
}
// Phone3 唯一的一个 class 对象
class Phone3 {
// synchronized 锁的对象是方法的调用者
// static 静态方法
// 类一加载就有了,锁的是Class对象
// 所以此时两个对象是同一把锁,也就是Class,
public static synchronized void sendMessage() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Phone.sendMessage");
}
public static synchronized void callMessage() {
System.out.println("Phone.callMessage");
}
}
/**
* 7. 一个静态的同步方法,1个普通的同步方法,1个对象,先打印发短信还是打电话?
* 8、一个静态的同步方法,1个普通的同步方法,2个对象,先打印发短信还是打电话?
*/
public class Test4 {
public static void main(String[] args) throws Exception {
// 两个对象
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
// 锁的存在
new Thread(() -> {
phone1.sendMessage();
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.callMessage();
}, "B").start();
}
}
class Phone4 {
// 锁的是Class
public static synchronized void sendMessage() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Phone.sendMessage");
}
// 锁的是对象
public synchronized void callMessage() {
System.out.println("Phone.callMessage");
}
}
小结
new this 具体的一个手机
static Class 唯一的一个模板
List
并发修改异常
/**
* java.lang.UnsupportedOperationException 并发修改异常
*
*/
public class ListTest {
public static void main(String[] args) {
// 并发下 ArrayList是不安全的
// 解决方案:
// List<String> list = new Vector<>();
// List<String> list1 = Collections.synchronizedList(new ArrayList<>());
// CopyOnWriteArrayList<String> list1 = new CopyOnWriteArrayList<>();
// CopyOnWrite 写入时复制 COW思想 计算机程序设计领域的一种优化策略
// 多个线程调用的时候,list 读取的时候,固定的,写入/ 覆盖
// 在写入的时候避免覆盖,造成数据问题
// CopyOnWriteArrayList 比 Vector 强在哪里?使用 Lock锁
CopyOnWriteArrayList<String> list1 = new CopyOnWriteArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list1.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list1);
}, String.valueOf(i)).start();
}
}
}
/**
* 同理可得 java.util.ConcurrentModificationException
* 解决方案1: Set<String> set = Collections.synchronizedSet(new HashSet<String>());
* 解决方案2:CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
// HashSet<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<String>());
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 4));
System.out.println(set);
}).start();
}
}
}
HashMap 线程不安全的
/**
* java.util.ConcurrentModificationException
*/
public class MapTest {
public static void main(String[] args) {
// map 是这样使用的吗?
//
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
// Map<String, String> map = new HashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 6));
System.out.println(map);
}).start();
}
}
}
代码实现
public class CallableTest {
public static void main(String[] args) throws Exception {
// new Thread(new Runnbale()).start();
// new Thread(new FutureTask<V>()).start();
MyCallableTest myCallableTest = new MyCallableTest();
// 适配类
FutureTask<String> task = new FutureTask<>(myCallableTest);
// 如果对象相同,结果会被缓存,提高效率
new Thread(task,"A").start();
new Thread(task,"B").start();
// 获取返回结果
// get 方法可能会有阻塞,因为要等待线程执行结束返回结果 将其放在最后
// 异步通信来处理
String res = task.get();
System.out.println(res);
}
}
class MyCallableTest implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("call able ");
return "hello world";
}
}
细节
CountDownLatch
/**
* 计数器 -1
*/
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
// 总数是6 倒计时
// 在必须要执行任务的时候 再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
// 数量 -1
System.out.println(Thread.currentThread().getName() + " GO !");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 等待计数器归零,然后再向下执行
countDownLatch.await();
System.out.println("Close door");
}
}
原理:
每次有线程调用 countDown() 数量 -1 ,假设计时器变为0,countDownLatch.await() 就会被唤醒
CyclicBarrier
加法计数器
/**
* 集齐7龙珠召唤神龙
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙成功");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + temp);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore 信号量
public class SemaphoreTest {
public static void main(String[] args) {
// 线程数量:停车位 限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
// semaphore.acquire(); 获得
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到车位");
TimeUnit.SECONDS.sleep(2);
// semaphore.release(); 释放
System.out.println(Thread.currentThread().getName() + " 抢到车位");
System.out.println(Thread.currentThread().getName() + " 离开车位");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i + 1)).start();
}
}
}
原理:
作用:多个共享资源互斥的使用 并发限流,控制最大的线程数
/**
* 读写锁
* 读 - 读
* 读 - 写
* 写 - 写
*/
public class ReadWriteLockTest {
public static void main(String[] args) {
MyCache2 myCache = new MyCache2();
// 写入操作
for (int i = 0; i < 140; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "", new Object());
}, String.valueOf(i + " W")).start();
}
// 读取操作
for (int i = 0; i < 140; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i + " R")).start();
}
}
}
/**
* 自定义缓存
*/
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
// 存 写
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + " put " + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " put ok");
}
// 取 读
public void get(String key) {
System.out.println(Thread.currentThread().getName() + " get " + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + " get ok");
System.out.println(o);
}
}
/**
* 自定义缓存2
* 加锁的
*/
class MyCache2 {
private volatile Map<String, Object> map = new HashMap<>();
// 读写锁 更加细粒度的控制
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 存 写 只希望通知是一个去读
public void put(String key, Object value) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " put " + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " put ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
// 取 读
public void get(String key) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " get " + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + " get ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
}
阻塞
队列
BlockingQueue
什么时候会使用阻塞队列:线程池,多线程并发原理
队列的使用
添加、移除
四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞异常 | 超时等待 |
---|---|---|---|---|
添加 | add(ele) | offer(ele) | put(ele) | offer(ele,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
判断队列首部 | element() | peek() | - | - |
/**
* 抛出异常
* java.lang.IllegalStateException: Queue full
* java.util.NoSuchElementException
*/
public static void test1() {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.add("D"));
}
/**
* 不抛出异常
* 添加 不抛出异常 返回 false
* 没有元素可弹出,就返回 null
*/
public static void test2() {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
/**
* 等待 阻塞
*/
public static void test3() throws Exception {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// 一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
/**
* 等待 超时退出
*/
public static void test4() throws Exception {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
blockingQueue.offer("d", 2, TimeUnit.SECONDS);
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
SynchronousQueue:同步队列
/**
* 同步队列
*
* 和其他的BlockingQueue 不一样,不能存储元素
* 放进去的必须取出来,也可以理解就只能放一个元素
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<Object> sync = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
sync.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
sync.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
sync.put("3");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " 取出 " + sync.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " 取出 " + sync.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " 取出 " + sync.take());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
线程池:三大方法、7大参数、4种拒绝策略
池化技术:就是为了
程序的运行本质:就是占用系统资源!优化资源的使用,所以就演变出来一种策略,就是池化技术
线程池、连接池、内存池、对象池
线程池的好处:
线程可以复用、可以控制最大并发数、管理线程
线程池:三大方法
/**
* 工具类 Executors
*/
public class Demo01 {
public static void main(String[] args) {
// 单个线程
// ExecutorService service = Executors.newSingleThreadExecutor();
// 可以伸缩的线程池
// ExecutorService service = Executors.newCachedThreadPool();
// 固定的线程池大小
ExecutorService service = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 100; i++) {
service.execute(() -> {
System.out.println("Demo01.main " + Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
service.shutdown();
}
// 线程池用完 线程池结束 关闭线程池
}
}
七大参数
// 单一线程
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 可以伸缩的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 所有的本质
// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建一个线程池
/**
* 工具类 Executors
* 异常 java.util.concurrent.RejectedExecutionException
*/
public class Demo02 {
public static void main(String[] args) {
// 单个线程
ExecutorService service = new ThreadPoolExecutor(
4,
7,
5,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
// 满了 还有人进来 不处理这个人,抛出异常
new ThreadPoolExecutor.AbortPolicy()
);
try {
// 最大承载:Deque + MaxSize
// 超过了最大承载,就报错 拒绝策略异常
// java.util.concurrent.RejectedExecutionException
for (int i = 0; i < 1000; i++) {
service.execute(() -> {
System.out.println("Demo01.main " + Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
service.shutdown();
}
// 线程池用完 线程池结束 关闭线程池
}
}
四种拒绝策略
AbortPolicy
CallerRunsPolicy
DiscardOldestPolicy
DiscardPolicy
池的最大线程到底应该如何定义
CPU密集型 几核就定义为几,可以保证CPU数量
// 获取CPU的核数
int num = Runtime.getRuntime().availableProcessors();
IO密集型 程序有很多任务,io十分占用资源
大于 判断你程序中的十分耗IO的线程
新时代的程序员:lambda表达式、函数式接口、链式接口、Stream流式计算
函数式接口,只有一个方法的接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
// 简化编程模型
// foreach(消费者类的函数式接口)
Function
代码测试
Function接口
/**
* Function 函数式接口
* 有一个 输入参数 一个输出
* 只要是函数式接口,都可以使用 lambda 表达式简化
*/
public class Demo01 {
public static void main(String[] args) {
Function function = new Function<String, String>() {
@Override
public String apply(String s) {
return s;
}
};
Function function1 = x -> {
return "324" + x;
};
Object apply = function1.apply("23");
System.out.println(apply);
}
}
Predicate 断定型接口:传入一个值,返回 false 或者 true
/**
* Predicate<T> 传入一个对象,放回 true 或者 false
*/
public class Demo02 {
public static void main(String[] args) {
Predicate predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
if ("hello".equals(s)) {
return true;
}
return false;
}
};
Predicate predicate2 = (x) -> {
if ("hello".equals(x)) {
return true;
}
return false;
};
System.out.println(predicate.test("hello"));
System.out.println(predicate.test("hello 2orl"));
System.out.println("--------------------");
System.out.println(predicate2.test("hello"));
System.out.println(predicate2.test("hello 2orl"));
}
}
Consumer 消费者接口
/**
* Consumer 消费型接口:只有输入 没有返回值
*/
public class Demo03 {
public static void main(String[] args) {
Consumer<String> consumer = o -> {
System.out.println(o);
};
consumer.accept("o?o");
}
}
Supplier 供给型接口
没有参数,只有返回值
/**
* Supplier
*/
public class Demo04 {
public static void main(String[] args) {
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "hello world";
}
};
Supplier<String> supplier2 = () -> {
return "???";
};
System.out.println(supplier.get());
System.out.println(supplier2.get());
}
}
什么是Stream流式计算
大数据:存储+计算
集合、MySQL存储本质就是存储东西的
计算都应该交给流来做
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private int id;
private String name;
private int age;
}
/**
* 题目要求:一分钟完成此题,只能用一行代码实现
* 现在有5个用户,需要进行筛选
* <p>
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒着排序
* 5、只输出一个用户
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(6, "e", 25);
// 集合就是存储
List<User> users = Arrays.asList(u1, u2, u3, u4, u5);
// 计算交给流
// 链式编程
users
.stream()
.filter((user) -> {
return (user.getId() & 1) == 0;
})
.filter((user) -> {
return user.getAge() > 23;
})
.map((user) -> {
return user.getName().toUpperCase();
})
.sorted((user1, user2) -> {
return user2.compareTo(user1);
})
.limit(1)
.forEach(System.out::println);
}
}
什么是 ForkJoin
ForkJoin 在JDK1.7,并行执行任务 提高效率
大数据:Map reduce (把大任务划分为小任务)
特点:
ForkJoin
/**
* 求和计算的任务、
* 如何使用 forkJoin
* 1、forkjoinpool 通过它来执行
* 2、计算任务 execute(ForkJoinTask<?> task)
* 3、计算类必须继承 ForkJoinTest
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
// 临界值
private long temp = 1000L;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
Long sum = 0L;
if ((end - start) < temp) {
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 分支合并计算
long middle = (start + end) / 2;
ForkJoinDemo forkJoin1 = new ForkJoinDemo(start, middle);
// 拆分任务 把任务压入线程队列
forkJoin1.fork();
ForkJoinDemo forkJoin2 = new ForkJoinDemo(middle + 1, end);
// 拆分任务 把任务压入线程队列
forkJoin2.fork();
return forkJoin1.join() + forkJoin2.join();
}
}
}
public class ForkJoinTest {
public static void main(String[] args) {
test1();
test2();
test3();
}
/**
* 普通程序员
* <p>
* sum = 500000000500000000 时间:7134 ms
*/
public static void test1() {
long start = System.currentTimeMillis();
Long sum = 0L;
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 时间:" + (end - start) + " ms");
}
/**
* 中级程序员
* <p>
* sum = 500000000500000000 时间:3859 ms
*/
public static void test2() {
long start = System.currentTimeMillis();
// ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1, 10_0000_0000);
// forkJoinPool.execute(forkJoinDemo);
// Long sum = 0L;
Long sum = forkJoinDemo.compute();
//
// try {
// sum = forkJoinDemo.get();
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 时间:" + (end - start) + " ms");
}
/**
* 高级程序员
*
* sum = 500000000500000000 时间:830 ms
*/
public static void test3() {
long start = System.currentTimeMillis();
// Stream 并行流
long sum = LongStream
.rangeClosed(0L, 10_0000_0000L)
.parallel()
.reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 时间:" + (end - start) + " ms");
}
}
Future
public class Demo01 {
public static void main(String[] args) {
try {
// 发起一个请求
// 没有返回值的异步回调 runAsync
// 有返回值的异步回调 supplyAsync
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("???");
});
System.out.println(111);
System.out.println(voidCompletableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Demo02 {
public static void main(String[] args) {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "???";
});
try {
System.out.println(stringCompletableFuture.whenComplete((t, u) -> {
System.out.println(t + " " + u);
}).exceptionally((e) -> {
e.printStackTrace();
return "exception";
}).get());
String s = stringCompletableFuture.get();
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
8种操作
// 源码
public enum Single {
INSTANCE;
public static Single getInstance() {
return INSTANCE;
}
}
class Test {
public static void main(String[] args) throws Exception {
System.out.println(Single.getInstance());
System.out.println(Single.INSTANCE);
Constructor<Single> constructor = Single.class.getConstructor(null);
constructor.setAccessible(true);
/**
* 说明 枚举没有这个构造方法 IDEA 骗人了
*
* Exception in thread "main"
* java.lang.NoSuchMethodException: cn.icanci.unsafe.Single.<init>()
*
* public enum Single {
* INSTANCE;
* private Single() {
* }
* public static Single getInstance() {
* return INSTANCE;
* }
* }
*
*
* 枚举不能创建对象
*
* if ((clazz.getModifiers() & Modifier.ENUM) != 0)
* throw new IllegalArgumentException("Cannot reflectively create enum objects");
*/
Single single = constructor.newInstance();
System.out.println(single);
}
}
// 字节码
package cn.icanci.unsafe;
public enum Single
{
INSTANCE;
private Single() {}
public static Single getInstance()
{
return INSTANCE;
}
}
// 反编译之后的文件
package cn.icanci.unsafe;
public final class Single extends Enum
{
public static Single[] values()
{
return (Single[])$VALUES.clone();
}
public static Single valueOf(String name)
{
return (Single)Enum.valueOf(cn/icanci/unsafe/Single, name);
}
private Single(String s, int i)
{
super(s, i);
}
public static Single getInstance()
{
return INSTANCE;
}
public static final Single INSTANCE;
private static final Single $VALUES[];
static
{
INSTANCE = new Single("INSTANCE", 0);
$VALUES = (new Single[] {
INSTANCE
});
}
}
public enum Single {
INSTANCE;
public static Single getInstance() {
return INSTANCE;
}
}
class Test {
public static void main(String[] args) throws Exception {
System.out.println(Single.getInstance());
System.out.println(Single.INSTANCE);
Constructor<Single> constructor = Single.class.getDeclaredConstructor(String.class,int.class);
constructor.setAccessible(true);
/*
* 枚举不能创建对象
*
* if ((clazz.getModifiers() & Modifier.ENUM) != 0)
* throw new IllegalArgumentException("Cannot reflectively create enum objects");
*/
Single single = constructor.newInstance();
System.out.println(single);
}
}
// Exception in thread "main" java.lang.IllegalArgumentException: Cannot reflectively create enum object at java.lang.reflect.Constructor.newInstance(Constructor.java:417)
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
MyThread myThread1 = new MyThread(lockA,lockB);
MyThread myThread2 = new MyThread(lockB,lockA);
new Thread(myThread1).start();
new Thread(myThread2).start();
}
}
class MyThread implements Runnable {
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "lock:" + lockA + " => " + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "lock:" + lockB + " => " + lockA);
}
}
}
}
面试中,排除问题
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。