线程同步 ->等待机制

当多个线程操作一个对象时,需要线程同步,此对象的线程进入这个对象的等待池形成队列,排队等待

并发:同一个对象被多个线程同时操作,线程不安全

形成条件:队列&锁
访问时加入了锁机制(synchronized),当一个线程获得对象的锁即可独占资源,其他线程需等待


不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class UnsafeBank {
public static void main(String[] args) {
Account account = new Account("张三", 100); // 初始余额 100 元

WithdrawTask t1 = new WithdrawTask(account, 80);
WithdrawTask t2 = new WithdrawTask(account, 80);

new Thread(t1, "小明").start();
new Thread(t2, "小红").start();
}
}

// 账户类
class Account {
String name; // 账户名
int balance; // 余额

public Account(String name, int balance) {
this.name = name;
this.balance = balance;
}
}

// 取钱任务(不加锁)
class WithdrawTask implements Runnable {
private final Account account;
private final int amount;

public WithdrawTask(Account account, int amount) {
this.account = account;
this.amount = amount;
}

@Override
public void run() {
if (account.balance >= amount) {
System.out.println(Thread.currentThread().getName() + " 准备取钱,余额:" + account.balance);
try {
Thread.sleep(100); // 模拟延迟,增加并发冲突概率
} catch (InterruptedException e) {
e.printStackTrace();
}
account.balance -= amount;
System.out.println(Thread.currentThread().getName() + " 取钱成功,剩余余额:" + account.balance);
} else {
System.out.println(Thread.currentThread().getName() + " 取钱失败,余额不足!");
}
}
}

并发:小明和小红取钱时都以为钱够,于是都取了80

同步方法(synchronized)&同步块

同步方法

1
public synchronized void method(int args){}

synchronized方法控制”对象”的访问,每个对象对应一把锁,synchronize方法必须获得调用该方法的对象的锁才能执行,否则线程阻塞

同步块

1
synchronized(Obj){}

修改后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class UnsafeBank {
public static void main(String[] args) {
Account account = new Account("张三", 100); // 初始余额 100 元

WithdrawTask t1 = new WithdrawTask(account, 80);
WithdrawTask t2 = new WithdrawTask(account, 80);

new Thread(t1, "小明").start();
new Thread(t2, "小红").start();
}
}

// 账户类
class Account {
String name; // 账户名
int balance; // 余额

public Account(String name, int balance) {
this.name = name;
this.balance = balance;
}
}

// 取钱任务(不加锁)
class WithdrawTask implements Runnable {
private final Account account;
private final int amount;

public WithdrawTask(Account account, int amount) {
this.account = account;
this.amount = amount;
}

@Override
public void run() {
synchronized (account) { //一次只有一个线程拿到锁
if (account.balance >= amount) {
System.out.println(Thread.currentThread().getName() + " 准备取钱,余额:" + account.balance);
try {
Thread.sleep(100); // 模拟延迟,增加并发冲突概率
} catch (InterruptedException e) {
e.printStackTrace();
}
account.balance -= amount;
System.out.println(Thread.currentThread().getName() + " 取钱成功,剩余余额:" + account.balance);
} else {
System.out.println(Thread.currentThread().getName() + " 取钱失败,余额不足!");
}
}
}
}

死锁

多个线程互相抱着对方所需要的资源,然后形成僵持
不要将synchronized套娃就会死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DeadLock {
public static void main(String[] args) {
new Thread(new Change(0,"A")).start();
new Thread(new Change(1, "B")).start();
}
}

class In {

}

class Out {

}

class Change implements Runnable {
static final In in = new In();
static final Out out = new Out();

int choice;
String name;

public Change(int choice, String name) {
this.choice = choice;
this.name = name;
}

@Override
public void run() {
try {
change();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void change() throws InterruptedException {
if (choice == 1) {
synchronized (in) {
System.out.println(name + "获得进去的锁");
Thread.sleep(1000);
}
synchronized (out) {
System.out.println(name + "获得出去的锁");
}
} else {
synchronized (out) {
System.out.println(name + "获得出去的锁");
Thread.sleep(2000);
}
synchronized (in) {
System.out.println(name + "获得进去的锁");
}
}
}
}


Lock锁

显式定义同步锁
ReentrantLock可重入锁 -> 被创建在需要锁的对象的类中
lock&unlock -> 锁&释放锁


生产者消费者问题

这是一个线程同步问题,生产者与消费者共享同一个资源
生产者:生产商品后通知消费者
消费者:消费商品后通知生产者

Object类方法 ->解决线程通信问题

仅可在同步方法或同步代码块中使用

方法名 作用
wait() 该线程一直等待直到被其他线程唤醒,与sleep不同,会释放锁
wait(long timeout) 指定等待的毫秒数
notify() 唤醒一个正在等待的线程
notifyAll() 唤醒同一个对象上所有wait的线程 按优先级

管程法 –>利用缓冲区

生产者:负责生产数据的模块
消费者:负责处理数据的模块

生产者将生产好的数据放入缓冲区中,消费者从缓冲区中取出数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//管程法
public class TestPC {
public static void main(String[] args) {
SynContainer container = new SynContainer();
new Thread(new Producer(container)).start();
new Thread(new Consumer(container)).start();
}
}

class Producer implements Runnable{
SynContainer container;

public Producer(SynContainer container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 60; i++) {
try {
container.push(new Produce(i)); //生产
System.out.println("生产了第"+i+"个产品");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
//生产者
}

class Consumer implements Runnable{
SynContainer container;

public Consumer(SynContainer container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 70; i++) {
try {
container.pop(); //消费
System.out.println("消费了第"+i+"个产品");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

}

}
//消费者
}

class Produce {
//产品
int id;

public Produce(int id) {
this.id = id;
}
}

class SynContainer {
//缓冲池
int count = 0;
private Produce[] produces = new Produce[10];

synchronized void push(Produce produce) throws InterruptedException { //这里的produce可以看作一个临时的参数 需要将produce成功放入缓冲区 所以传参produce
//生产者生产
if (count == produces.length) {
//生产区满了
this.wait(); //生产者等待消费者消费
}
//若没满丢入产品
produces[count++] = produce;

this.notify(); //生产出一个商品 通知消费者来消费

}

synchronized Produce pop() throws InterruptedException { //需要从缓冲区调用一个对象 所以要有返回值 返回一个Produce类型的对象
if (count == 0) {
//消费完了 等待生产
this.wait();
}
Produce produce = produces[--count];
notify(); //消费完了通知生产者
return produce; //该方法的作用就是返回一个produce的值 因为要从缓冲区中取出produce,然后返回
}
}

信号灯法

设置一个标志位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//信号灯
public class TestPC2 {
public static void main(String[] args) {
Kitchen kitchen = new Kitchen(new Dish());
new Thread(new Guest(kitchen)).start();
new Thread(new Chef(kitchen)).start();
}
}

class Chef implements Runnable{
Kitchen kitchen;
public Chef(Kitchen kitchen) {
this.kitchen = kitchen;
}

@Override
public void run() {
for (int i = 1; i < 21; i++) {
kitchen.cook(i);

}

}
}

class Guest implements Runnable{
Kitchen kitchen;

public Guest(Kitchen kitchen) {
this.kitchen = kitchen;
}

@Override
public void run() {
for (int i = 1; i < 30; i++) {
kitchen.eat(i);
}
}
}

class Dish {
int dishNumber;
}

class Kitchen {
//菜做好了 顾客吃菜 厨师等待做菜 T
//菜没做好 厨师做菜 顾客等待上菜 F

Dish dish;

public Kitchen(Dish dish) {
this.dish = dish;
}

boolean flag = false; //初始无菜

//顾客吃菜
public synchronized void eat(int dishNumber){
while (!flag){
try {
System.out.println("顾客等待");
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("顾客正在吃第"+dishNumber+"道菜");
flag = false;
notifyAll();
}

//厨师做菜
public synchronized void cook(int dishNumber) {

while (flag){
try {
System.out.println("厨师等待");
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("厨师正在做第"+dishNumber+"道菜");
flag = true;
notifyAll(); //吃完了,通知厨师做
}
}

线程池

提前创建好多个线程放在线程池中,使用时直接获取,用完放回池中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
```maximumPoolSize```:最大线程数
```keepAliveTime```:线程没有任务时最多保持多久

利用callable实现
```void execute(Runnable command)```:用来执行Runnable,无返回值
```<T>Future<T> submit(Callable<T> task)```:用来执行Callable,有返回值
```void shutdown()```:关闭线程池

Runnable:
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestPool {
public static void main(String[] args) {
//创建服务 创建线程池
ExecutorService ser = Executors.newFixedThreadPool(3);
ser.execute(new MyThread());
ser.execute(new MyThread());
ser.execute(new MyThread());
}
}

class MyThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}

Callable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.*;

public class TestPool2 {
public static void main(String[] args) {
ExecutorService ser = Executors.newFixedThreadPool(3); //创建容积为3的线程池
Future <Boolean> t1 = ser.submit(new MyThread2()); //提交对象到线程池
Future <Boolean> t2 = ser.submit(new MyThread2());
Future <Boolean> t3 = ser.submit(new MyThread2());

//关闭线程池
ser.shutdown();
}
}

class MyThread2 implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
System.out.println(Thread.currentThread().getName());
return true;
}
}