gball个人知识库
首页
基础组件
基础知识
算法&设计模式
  • 操作手册
  • 数据库
  • 极客时间
  • 每日随笔
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
  • 画图工具 (opens new window)
关于
  • 网盘 (opens new window)
  • 分类
  • 标签
  • 归档
项目
GitHub (opens new window)

ggball

后端界的小学生
首页
基础组件
基础知识
算法&设计模式
  • 操作手册
  • 数据库
  • 极客时间
  • 每日随笔
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
  • 画图工具 (opens new window)
关于
  • 网盘 (opens new window)
  • 分类
  • 标签
  • 归档
项目
GitHub (opens new window)
  • 面试

  • 数据库

  • linux

  • node

  • tensorFlow

  • 基础组件

  • 基础知识

    • java集合

    • jvm调优

    • java并发编程

      • 进程与线程
      • 多线程的方法介绍与使用
      • 线程死锁
      • 守护线程与用户线程
      • ThreadLocal了解
      • 什么是多线程并发编程
      • unsafe类了解
      • 伪共享
      • 锁的了解
      • ThreadLocalRandom原理解析
      • LongAdder,LongAccumulator类了解
      • 实践-创建多少线程合适
      • 线程通信——通知与等待
        • 那为什么要增加使线程等待,唤醒线程操作吗?
        • 利用notify和await实现阻塞队列(消费队列中的资源)
        • 利用await和signal 将异步转化为同步
        • 引用
      • 缓存一致性问题
      • 利用Excutors异步执行任务
      • 线程池
      • 线程池操作数据库造成死锁
      • Java 浅拷贝和深拷贝的理解和实现方式
      • java内存模型JMM
      • 锁升级过程
      • io模型
      • 关键字介绍
      • AQS解析
    • java网络编程

    • java8新特性

    • javaAgent

    • java高级

  • 算法与设计模式

  • 分布式

  • 疑难杂症

  • go学习之旅

  • 极客时间

  • 知识库
  • 基础知识
  • java并发编程
ggball
2022-08-04

线程通信——通知与等待

# 线程通信-通知与等待

我们知道 在java中 线程的状态 有6种,初始(NEW),运行(runnable),阻塞(bloked),等待(waiting),超时等待(timed_waiting),终止(terminated) Object.wait() 使线程等待,释放锁,Object.notify(),Object.notifyAll() 唤醒等待的线程

# 那为什么要增加使线程等待,唤醒线程操作吗?

我的理解是在多线程中,可以更好的分配资源。就打个比方,我们有个业务A,业务A里面有很多小操作a,b,c等,这些小操作都是需要获取锁操作,这是有线程1过来执行业务A,他先获取到了a的锁,想继续往下获取b的锁,发现b的锁已经被被别的线程获取到了,如果都是使用的synchronized关键字加锁的话,线程A会阻塞在这,如果一直获取不到b的锁,那么线程A也会一直阻塞在这(可能发生死锁了),这是我们大家都不愿意看到的。这样就需要线程之间互相协作,商量好哪个先获取锁哪个后获取锁。

# 利用notify和await实现阻塞队列(消费队列中的资源)

/**
 * @author ggBall
 * @version 1.0.0
 * @ClassName BlockedQueue.java
 * @Description 利用 notify 和 await 实现阻赛队列
 * @createTime 2022年07月05日 11:40:00
 */
public class BlockedQueue<T> {

    private final Lock lock = new ReentrantLock();

    /**
     * 队列不满条件
     */
    private final Condition notFull = lock.newCondition();

    /**
     * 队列不为空条件
     */
    private final Condition notEmpty = lock.newCondition();


    private final Object[] items;

    private int putIndex;

    public BlockedQueue(Object[] items) {
        this.items = items;
        this.putIndex = 0;
    }

    public BlockedQueue(int size) {
        this.items = new Object[size];
        this.putIndex = 0;
    }

    public void enqueue(T t) {
        lock.lock();

        try {
            // 队列已满
            while (putIndex > (items.length-1)) {
                notFull.await();
            }
            System.out.println(t+"入队");
            items[putIndex] = t;
            notEmpty.signal();
            putIndex++;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }

    }

    public void dequeue(){
        lock.lock();

        try {
            // 队列为空
            while (items.length == 0) {
                notEmpty.await();

            }
            System.out.println(Thread.currentThread().getName()+"出队");
            items[--putIndex] = null;
            // await() 和前面我们提到的 wait() 语义是一样的;signal() 和前面我们提到的 notify() 语义是一样的。
            notFull.signal();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    public int actualSize() {
        return putIndex;
    }

    public static void main(String[] args) throws InterruptedException {
        BlockedQueue<String> queue = new BlockedQueue<String>(5);
        for (int i = 0; i < 7; i++) {
            new Thread(() -> {
                queue.enqueue(Thread.currentThread().getName());
            },"t"+i).start();
        }

        Thread.sleep(1000);

        while (true) {

            if (queue.actualSize() == 0) {
                System.out.println("完全出队");
                break;
            }
            try {
                Thread.sleep(2000);
                queue.dequeue();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }


        }



    }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

20220708172010 针对队列已满,队列为空 条件控制线程的等待和运行,队列为空线程不能再进行消费,队列已满线程无法往其中发送消息。

# 利用await和signal 将异步转化为同步

/**
 * @author ggBall
 * @version 1.0.0
 * @ClassName ConditionDemo.java
 * @Description 异步转同步
 * @createTime 2022年07月08日 15:18:00
 */
public class ConditionDemo {

    private final Lock lock = new ReentrantLock();
    private final Condition don = lock.newCondition();

    public static void main(String[] args) {
        // B异步给A发送消息,B等待 A收到后,处理一段时间会往C处回调,B去C处获取消息
        ConditionDemo conditionDemo = new ConditionDemo();
        conditionDemo.testAsyncToSync();

    }

    public void testAsyncToSync() {

            Receiveer receiveer = new Receiveer();
            // 发送消息
            Thread thread = new Thread(new Msger("test", receiveer));
            thread.start();

            // 获取消息
            String message = receiveer.getMessage();
            System.out.println("message = " + message);


    }



    class Msger implements Runnable {

        private String message;
        private Receiveer receiveer;

        Msger(String message,Receiveer receiveer) {
            this.message = message;
            this.receiveer = receiveer;
        }

        public void send() {
            System.out.println("开始发送消息");
            receiveer.setMessage(message);
        }

        @Override
        public void run() {
            send();
        }
    }

    class Receiveer {
        private String message;

        public void setMessage(String message)  {
            try {
                lock.lock();
                Thread.sleep(3000);
                this.message = message;
                don.signal();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

        }

        public String getMessage() {
            lock.lock();
            try {
                if (!isDone()) {
                    System.out.println("开始等待");
                    don.await();
                    System.out.println("结束等待");
                }

                return message;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }

        public boolean isDone() {
            return "".equals(message);
        }

    }

}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

结果 20220708171731

流程是 发送消息是异步操作,获取消息会使线程等待,直到消息送达之后,才唤醒等待线程。

场景:目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情

# 引用

如何避免重复创建线程?ubbo如何用管程实现异步转同步 (opens new window) 用“等待-通知”机制优化循环等待 (opens new window)

上次更新: 2025/06/04, 15:06:15
实践-创建多少线程合适
缓存一致性问题

← 实践-创建多少线程合适 缓存一致性问题→

最近更新
01
AIIDE
03-07
02
githubActionCICD实战
03-07
03
windows安装Deep-Live-Cam教程
08-11
更多文章>
Theme by Vdoing
总访问量 次 | 总访客数 人
| Copyright © 2021-2025 ggball | 赣ICP备2021008769号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
×

评论

  • 评论 ssss
  • 回复
  • 评论 ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss
  • 回复
  • 评论 ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss
  • 回复
×