GVKun编程网logo

python 并发编程 多路复用IO(python3并发)

4

这篇文章主要围绕python并发编程多路复用IO和python3并发展开,旨在为您提供一份详细的参考资料。我们将全面介绍python并发编程多路复用IO的优缺点,解答python3并发的相关问题,同时

这篇文章主要围绕python 并发编程 多路复用IOpython3并发展开,旨在为您提供一份详细的参考资料。我们将全面介绍python 并发编程 多路复用IO的优缺点,解答python3并发的相关问题,同时也会为您带来IO通信模型(三)多路复用IO、JAVA I/O(六)多路复用IO、Java 并发编程 - 并发难点及解决方法、java 并发编程 CountDownLatch 使用模型 CountDownLatch + redis的实用方法。

本文目录一览:

python 并发编程 多路复用IO(python3并发)

python 并发编程 多路复用IO(python3并发)

 

 

多路复用IO(IO multiplexing)

这种IO方式为事件驱动IO(event driven IO)。

我们都知道,select/epoll的好处就在于单个进程process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

select是多路复用的一种

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket, 当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。 这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用\(select和recvfrom\), 而blocking IO只调用了一个系统调用\(recvfrom\)。但是,用select的优势在于它可以同时处理多个connection

 

多路复用IO比较阻塞IO模型:

1.阻塞IO经历两个阶段 wait data,copy data

2.多路复用3个阶段 wait data,ready copy data, copy data

单连接套接字通信 阻塞IO效率高

多路复用IO select可以代理多个套接字连接,多个套接字通信,多路复用IO效率高

 

强调:

1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

结论: select的优势在于可以处理多个连接,性能高,同时可以检测多个套接字IO行为,不适用于单个连接

 

select网络IO模型示例

select 检测多个套接字IO行为 accept,recv

IO行为两种:

1.别人给我传数据

2.给别人发送数据

 

timeout是超时时间

每隔0.5秒去问操作系统准备好数据没有

 

def select(rlist,wlist,xlist,timeout=None): 
    pass


# [] 传的空列表是出异常的列表
# 返回值3个列表 收列表,发列表,异常列表
rl,wl,xl = select.select(rlist,[],0.5)

 

 

客户端:

from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8000))

while True:
    msg = input(">>>:").strip()
    if not msg:continue
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))


client.close()

 

服务端代码:

from socket import *
import select

server = socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1,8000))
server.listen(5)
# 设置socket接口为 非阻塞IO接口
# 默认是True 为阻塞
server.setblocking(False)

# 专门存着收消息套接字
rlist = [server,]
# 存放发送消息套接字
wlist = []
# 存放发送的数据
wdata = {}
while True:

    # 返回值3个列表 收列表,发列表,异常列表
    rl,0.5)
    print("rl",rl)
    print("wl",wl)

    for sock in rl:
        if sock == server:
            conn,addr = sock.accept()
            rlist.append(conn)
        else:
            try:
                data = sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue

                # 收的套接字加到列表
                wlist.append(sock)
                # 把数据加到字典 做一个 套接字对应数据
                wdata[sock] = data.upper()

            except Exception:
                sock.close()
                rlist.remove(sock)


    # 发送数据
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

server.close()

基于select模块 检测套接字IO行为,实现并发效果

 

 select监听fd变化的过程分析:

用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,
就会发送信号给用户进程数据已到;
用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,
这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。

 

 

该模型的优点:

可以同时检测多个套接字,效率比阻塞IO,非阻塞IO高了

相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 cpu,同时能够为多客户端提供服务。
如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

 

 该模型的缺点:

代理的套接字 列表里的多个套接字,需要循环列表 一个个检测,

在代理套接字比较少的情况下,循环比较快。但select代理的套接字非常多的情况下,select随着列表增大,效率就越来越慢

首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。
很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了
/dev/poll,…。 如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异, 所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。 其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

epoll是异步方式实现,提交套接字时候,每个套接字身上都绑定一个回调函数,哪个套接字准备好了,就触发回调函数,把自己索引放在单独列表里,对于select来说,只需要去准备好的列表里 根据索引拿到套接字,这样不需要在列表里每个遍历。

epoll不支持windows系统

多路复用IO

IO通信模型(三)多路复用IO

IO通信模型(三)多路复用IO

多路复用IO

非阻塞同步IO的介绍中可以发现,为每一个接入创建一个线程在请求很多的情况下不那么适用了,因为这会渐渐耗尽服务器的资源,人们也都意识到了这个 问题,因此终于有人发明了IO多路复用。最大的特点就是不需要开那么多的线程和进程多路复用IO是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。

<!-- more -->

如图,这样在处理多个连接时,可以只需要一个线程监控就绪状态,对就绪的每个连接开一个线程处理就可以了,这样需要的线程数大大减少,减少了内存开销和上下文切换的CPU开销。

多路复用IO有几个比较重要的概念,下面一一讲解。

缓冲区Buffer

Buffer本质是可以写入可以读取的内存,这块内存被包装成了NIO的Buffer对象,然后为它提供一组用于访问的方法。Java则为java.nio.Buffer实现了基本数据类型的Buffer

所有的Buffer缓冲区都有4个属性,具体解释可以看表格。

属性 描述
Capacity 容量,可以容纳的最大数据量,不可变
Limit 上届,缓冲区当前数据量,Capacity=>Limit
Position 位置,下一个要被读取或者写入的元素的位置,Capacity>=Position
Mark 标记,调用mark()来设置mark=position,再调用reset()设置position=mark

这4个属性遵循大小关系: mark <= position <= limit <= capacity

Buffer的基本用法

使用Buffer读写数据一般遵循以下四个步骤:

  1. 写入数据到Buffer
  2. 调用flip()方法。
  3. 从Buffer中读取数据。
  4. 调用clear()方法或者compact()方法。

Buffer的测试代码

下面是对于Java中ByteBuffer的测试代码:

        // 申请一个大小为1024bytes的缓冲buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        System.out.println("申请到的Buffer:"+byteBuffer);

        // 写入helloworld到buffer
        byteBuffer.put("HelloWorld".getBytes());
        System.out.println("写入HelloWorld到Buffer:"+byteBuffer);

        // 切换为读模式
        byteBuffer.flip();
        // 当前Buffer已存放的大小
        int length = byteBuffer.remaining();
        byte[] bytes = new byte[length];

        // 读取bytes长度的数据
        byteBuffer.get(bytes);
        System.out.println("从buffer读取到数据:"+new String(bytes,"UTF-8"));

        // 切换为compact 清空已读取的数据
        byteBuffer.compact();
        System.out.println("读取后的Buffer:"+byteBuffer);

得到如下输出:

申请到的Buffer:java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]
写入HelloWorld到Buffer:java.nio.HeapByteBuffer[pos=10 lim=1024 cap=1024]
从buffer读取到数据:HelloWorld
读取后的Buffer:java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]

需要说明的是flip()方法将Buffer从写模式切换到读模式,clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。

Buffer的读写模式

注意读写模式切换时候几个标记位的变化。

通道Channel

通道Channel和流类似,不同的是通道的工作模式可以是全双工。也就是说既可以读取,也可以写入。同时也可以异步的进行读写。Channel连接着底层数据与缓冲区Buffer。 同样的,Java中针对不同的情况实现了不同的Channel操作类。常用的有

  1. FileChannel 从文件中读写数据。
  2. DatagramChannel 能通过UDP读写网络中的数据。
  3. SocketChannel 能通过TCP读写网络中的数据。
  4. ServerSocketChannel可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

下面是对于Java中Channel和Buffer的简单演示:

    // 申请一个大小为1024bytes的缓冲buffer
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     // 初始化Channel数据
    FileInputStream fis = new FileInputStream("f:/test.txt");
    FileChannel channel = fis.getChannel();
    System.out.println("Init Channel size:" + channel.size());
     // 从channel中读取数据
    int read = channel.read(byteBuffer);
    System.out.println("Read Size :" + read);
    System.out.println("byteBuffer:"+byteBuffer);
     // 切换到读取模式
    byteBuffer.flip();
     // 输出byteBuffer内容
    System.out.print("print byteBuffer:");
    while (byteBuffer.hasRemaining()){
        System.out.print((char) byteBuffer.get());
    }
     byteBuffer.clear();
    System.out.println(byteBuffer);
    fis.close();

输出信息如下:

Init Channel size:10
Read Size :10
byteBuffer:java.nio.HeapByteBuffer[pos=10 lim=1024 cap=1024]
print byteBuffer:helloworld

需要注意的是,在读取之前一定要调用flip()切换到读取模式。

选择器Selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。我们也可以称Selector为轮询代理器,事件订阅器或者channel容器管理器。 应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。

关于IO事件,我们可以在SelectionKey类中找到几个常用事件:

  1. OP_READ 可以读取
  2. OP_WRITE 可以写入
  3. OP_CONNECT 已经连接
  4. OP_ACCEPT 可以接受

值得注意的是,在程序中都是通过不断的轮训已经注册的Channel,根据检查注册时的感兴趣事件是否已经就绪来决定是否可以进行后续操作。同时Selector也有几个经常使用的方法。

  1. select() 阻塞到至少有一个通道在你注册的事件上就绪了。

  2. select(long timeout) 最长会阻塞timeout毫秒

  3. selectNow() 会阻塞,不管什么通道就绪都立刻返回

  4. selectedKeys() 返回就绪的通道

下面是一个对Java中Selector编写服务端的简单使用测试(客户端不在此编写了,如有需要,可以查看IO通信模型(一)同步阻塞模式BIO(Blocking IO)中的客户端代码):


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * <p>
 * NIO-Selector
 * 选择器的使用测试
 * Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读
 * 写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接
 * 。我们也可以称Selector为轮询代理器,事件订阅器或者channel容器管理器。
 * 应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些
 * IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。
 *
 * @Author niujinpeng
 * @Date 2018/10/26 15:31
 */
public class NioSelector {

    public static void main(String[] args) throws IOException {
        // 获取channel
        ServerSocketChannel channel = ServerSocketChannel.open();
        // channel是否阻塞
        channel.configureBlocking(false);
        // 监听88端口
        ServerSocket socket = channel.socket();
        socket.bind(new InetSocketAddress(83));


        // 创建选择器Selector
        Selector selector = Selector.open();
        // 像选择器中注册channel
        channel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 阻塞到有一个就绪
            int readyChannel = selector.select();
            if (readyChannel == 0) {
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                // 是否可以接受
                if (selectionKey.isAcceptable()) {
                    System.out.println("准备就绪");
                    SelectableChannel selectableChannel = selectionKey.channel();
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    // 注册感兴趣事件-读取
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
                } else if (selectionKey.isConnectable()) {
                    System.out.println("已连接");

                } else if (selectionKey.isReadable()) {
                    System.out.println("可以读取");

                } else if (selectionKey.isWritable()) {
                    System.out.println("可以写入");

                }
            }
        }
    }
}

Java NIO编程

到这里,已经对多路复用IO有了一个基本的认识了,可以结合上面的三个概念就行多路复用IO编程了,下面演示使用Java语言编写一个多路复用IO服务端。 NioSocketServer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * <p>
 * 使用Java NIO框架,实现一个支持多路复用IO的服务器端
 *
 * @Author niujinpeng
 * @Date 2018/10/16 0:53
 */
public class NioSocketServer {
    /**
     * 日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(NioSocketServer.class);

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 是否阻塞
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open();
        // 服务器通道只能注册SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
            if (selector.select(100) == 0) {
                //LOGGER.info("本次询问selector没有获取到任何准备好的事件");
                continue;
            }

            // 询问系统,所有获取到的事件类型
            Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
            while (selectionKeys.hasNext()) {
                SelectionKey readKey = selectionKeys.next();
                // 上面获取到的readKey要移除,不然会一直存在selector.selectedKeys()的集合之中
                selectionKeys.remove();

                SelectableChannel selectableChannel = readKey.channel();
                if (readKey.isValid() && readKey.isAcceptable()) {
                    LOGGER.info("--------------channel通道已经准备完毕-------------");
                    /*
                     * 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
                     * 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
                     * 否则无法监听到这个socket channel到达的数据
                     * */
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    registerSocketChannel(socketChannel, selector);
                } else if (readKey.isValid() && readKey.isConnectable()) {
                    LOGGER.info("--------------socket channel 建立连接-------------");
                } else if (readKey.isValid() && readKey.isReadable()) {
                    LOGGER.info("--------------socket channel 数据准备完成,可以开始读取-------------");
                    try {
                        readSocketChannel(readKey);
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            }
        }

    }

    /**
     * 在server socket channel接收到/准备好 一个新的 TCP连接后。
     * 就会向程序返回一个新的socketChannel。<br>
     * 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
     * 所以程序还没法通过selector通知这个socket channel的事件。
     * 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
     * socket channel感兴趣的事件
     *
     * @param socketChannel
     * @param selector
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel, Selector selector) {
        // 是否阻塞
        try {
            socketChannel.configureBlocking(false);
            // 读模式只能读,写模式可以同时读
            // socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
            socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(2048));
        } catch (IOException e) {
            LOGGER.info(e.toString(), e);
        }

    }

    private static void readSocketChannel(SelectionKey readKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel) readKey.channel();
        //获取客户端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress) clientSocketChannel.getRemoteAddress();
        int sourcePort = sourceSocketAddress.getPort();

        // 拿到这个socket channel使用的缓存区,准备读取数据
        // 解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
        ByteBuffer contextBytes = (ByteBuffer) readKey.attachment();
        // 通道的数据写入到【缓存区】
        // 由于之前设置了ByteBuffer的大小为2048 byte,所以可以存在写入不完的情况,需要调整
        int realLen = -1;
        try {
            realLen = clientSocketChannel.read(contextBytes);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
            clientSocketChannel.close();
            return;
        }

        // 如果缓存中没有数据
        if (realLen == -1) {
            LOGGER.warn("--------------缓存中没有数据-------------");
            return;
        }

        // 将缓存区读写状态模式进行切换
        contextBytes.flip();
        // 处理编码问题
        byte[] messageBytes = contextBytes.array();
        String messageEncode = new String(messageBytes, "UTF-8");
        String message = URLDecoder.decode(messageEncode, "UTF-8");

        // 接受到了"over"则清空buffer,并响应,否则不清空缓存,并还原Buffer写状态
        if (message.indexOf("over") != -1) {
            //清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
            contextBytes.clear();
            LOGGER.info("端口【" + sourcePort + "】客户端发来的信息:" + message);
            LOGGER.info("端口【" + sourcePort + "】客户端消息发送完毕");
            // 响应
            ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("Done!", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            LOGGER.info("端口【" + sourcePort + "】客户端发来的信息还未完毕,继续接收");
            // limit和capacity的值一致,position的位置是realLen的位置
            contextBytes.position(realLen);
            contextBytes.limit(contextBytes.capacity());
        }
    }
}

多路复用IO优缺点

  • 不需要使用多线程进行IO处理了
  • 同一个端口可以处理多种协议
  • 多路复用IO具有操作系统级别的优化
  • 其实底层还都是同步IO

文章代码已经上传GitHub:https://github.com/niumoo/java-toolbox/

<完> 本文原发于个人博客:https://www.codingme.net 转载请注明出处

JAVA I/O(六)多路复用IO

JAVA I/O(六)多路复用IO

在前边介绍Socket和ServerSocket连接交互的过程中,读写都是阻塞的。套接字写数据时,数据先写入操作系统的缓存中,形成TCP或UDP的负载,作为套接字传输到目标端,当缓存大小不足时,线程会阻塞。套接字读数据时,如果操作系统缓存没有接收到信息,则读线程阻塞。线程阻塞情况下,就不能处理其他事情。JDK1.4引入了通道和选择器的概念,以支持异步或多路复用的IO。

Unix系统中的select()方法可以实现异步IO,可以给该Selector注册多个描述符(可读或可写),然后对这些描述符进行监控。在Java中,描述符即为套接字Socket。

如JAVA I/O(二)文件NIO中对选择器的介绍,在非阻塞模式下,用select()方法检测发生变化的通道,每个通道都关联一个Socket,用一个线程实现多个客户端的请求,从而实现多路复用。

 

 

1. 简单实例

服务器端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

public class MultiJabberServer1 {

    public static final int PORT = 8080;
    
    public static void main(String[] args) throws IOException{
        
        String encoding = System.getProperty("file.encoding");
        Charset cs = Charset.forName(encoding);
        ByteBuffer buffer = ByteBuffer.allocate(16);
        SocketChannel ch = null;//Socket对应的channel
        //1.创建ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //2.创建选择器Selector
        Selector sel = Selector.open();
        
        try {
            //3.设置ServerSocketChannel通道为非阻塞
            ssc.configureBlocking(false);
            //4.ServerSocketChannel关联Socket,用于监听连接,使用本地ip和port
            //注意:Socket也对通道进行了改造,直接调Socket.getChannel()将返回bull,除非通过下边与通道关联
            //the expression (ssc.socket().getChannel() != null) is true
            ssc.socket().bind(new InetSocketAddress(PORT));
            //5.将通道注册到Selector,感兴趣的事件为  连接  事件
            ssc.register(sel, SelectionKey.OP_ACCEPT);
            System.out.println("Server on port: " + PORT);
            while(true) {
                //6.没有事件发生时,一直阻塞等待
                sel.select();
                //7.有事件发生时,获取Selector中所有SelectorKey(持有选择器与通道的关联关系)。
                //由于基于操作系统的poll()方法,当有事件发生时,只返回事件个数,无法确定具体通道,故只能对所有注册的通道进行遍历。
                Iterator<SelectionKey> it = sel.selectedKeys().iterator();
                //8.遍历所有SelectorKey,处理事件
                while(it.hasNext()) {
                    SelectionKey sKey = it.next();
                    it.remove();//防止重复处理
                    //9.判断SelectorKey对应的channel发生的事件是否socket连接
                    if(sKey.isAcceptable()) {
                        //10.与ServerSocket.accept()方法相似,接收到该通道套接字的连接,返回SocketChannel,与客户端进行交互
                        ch = ssc.accept();
                        System.out.println(
                                "Accepted connection from:" + ch.socket());
                        //11.设置该SocketChannel为非阻塞模式
                        ch.configureBlocking(false);
                        //12.将该通道注册到Selector中,感兴趣的事件为OP_READ(读)
                        ch.register(sel, SelectionKey.OP_READ);
                    }else {
                        //13.发生非连接事件,此处为OP_READ事件。SelectorKey获取注册的SocketChannel,用于读写
                        ch = (SocketChannel)sKey.channel();
                        //14.将数据从channel读到ByteBuffer中
                        ch.read(buffer);
                        CharBuffer cb = cs.decode((ByteBuffer)buffer.flip());
                        String response = cb.toString();
                        System.out.print("Echoing : " + response);
                        //15.再将获取到的数据会写给客户端
                        ch.write((ByteBuffer)buffer.rewind());
                        if(response.indexOf("END") != -1)
                            ch.close();
                        buffer.clear();
                    }
                }
            }
        } finally {
            if(ch != null)
                ch.close();
            ssc.close();
            sel.close();
        }
    }
}

 如代码中注释标明,大致步骤包含:

  • 创建ServerSocketChannel和Selector,设置通道非阻塞,并与服务端的Socket绑定
  • 注册 ServerSocketChannel到Selector,感兴趣的事件为OP_CONNECT(获取连接)
  • select()方法阻塞等待,直到有事件发生
  • 遍历Selector中的所有注册事件,通过SelectorKey维护Selector和Channel关联关系
  • 如果是连接事件,则调ServerSocketChannel.accept()方法获取SocketChannel,与客户端交互
  • 如果是读事件,则通过SelectorKey中获取SocketChannel,读写数据

运行结果:

Server on port: 8080

客户端

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import com.test.socketio.JabberServer;

/**
 * 采用这种方式,读与写是非阻塞的
 * 普通的读写是阻塞的,直到读完或写完
 *
 */
public class JabberClient1 {
    
    static final int clPot = 8899;

    public static void main(String[] args) throws IOException{
        //1.创建SocketChannel
        SocketChannel sc = SocketChannel.open();
        //2.创建Selector
        Selector sel = Selector.open();
        try {
            sc.configureBlocking(false);
            //3.关联SocketChannel和Socket,socket绑定到本机端口
            sc.socket().bind(new InetSocketAddress(clPot));
            //4.注册到Selector,感兴趣的事件为OP_CONNECT、OP_READ、OP_WRITE
            sc.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            int i = 0;
            boolean written = false, done = false;
            String encoding = System.getProperty("file.encoding");
            Charset cs = Charset.forName(encoding);
            ByteBuffer buffer = ByteBuffer.allocate(16);
            while(!done) {
                sel.select();
                //5.从选择器中获取所有注册的通道信息(SelectionKey作为标识)
                Iterator<SelectionKey> it = sel.selectedKeys().iterator();
                while(it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    //6.获取通道,此处即为上边创建的channel
                    sc = (SocketChannel)key.channel();    
                    //7.判断SelectorKey对应的channel发生的事件是否socket连接,并且还没有连接
                    if(key.isConnectable() && !sc.isConnected()) {
                        InetAddress addr = InetAddress.getByName(null);
                        //连接addr和port对应的服务器
                        boolean success = sc.connect(new InetSocketAddress(addr, JabberServer.PORT));
                        if(!success)
                            sc.finishConnect();
                    }
                    //8.读与写是非阻塞的:客户端写一个信息到服务器,服务器发送一个信息到客户端,客户端再读
                    if(key.isReadable() && written) {
                        if(sc.read((ByteBuffer)buffer.clear()) > 0) {
                            written = false;
                            String response = cs.decode((ByteBuffer)buffer.flip()).toString();
                            System.out.println(response);
                            if(response.indexOf("END") != -1)
                                done = true;
                        }
                    }
                    if(key.isWritable() && !written) {
                        if(i < 10)
                            sc.write(ByteBuffer.wrap(new String("howdy " + i + "\n").getBytes()));
                        else if(i == 10){
                            sc.write(ByteBuffer.wrap("END".getBytes()));
                        }
                        written = true;
                        i++;
                    }
                }
            }
        } finally {
            sc.close();
            sel.close();
        }
    }
}

客户端与服务端类似,不同之处:

  • 创建SocketChannel通道,注册到选择器,刚兴趣的事件为OP_CONNECT、OP_READ、OP_WRITE
  • 调试发现,客户端sel.select()不会阻塞,对注册通道不断的遍历,并且每次都可写。原因是OP_WRITE事件会持续生效,即只要连接存在就可以写,不管服务端是否有返回
  • 本例中,客户端发送一条数据,服务端接收一条,并返回给客户端;客户端接到服务端的消息后,才会发生下一条数据,主要通过written标识进行控制的。

运行机制

运行结果

服务端

Server on port: 8080
Accepted connection from:Socket[addr=/127.0.0.1,port=8899,localport=8080]
Echoing : howdy 0
Echoing : howdy 1
Echoing : howdy 2
Echoing : howdy 3
Echoing : howdy 4
Echoing : howdy 5
Echoing : howdy 6
Echoing : howdy 7
Echoing : howdy 8
Echoing : howdy 9
Echoing : END

客户端

howdy 0
howdy 1
howdy 2
howdy 3
howdy 4
howdy 5
howdy 6
howdy 7
howdy 8
howdy 9
END

2.核心类分析

(1)通道(SelectableChannel)

 通道Channel继承体系如下,其中ServerSocketChannel和SocketChannel都继承自SelectableChannel。

  • SelectableChannel通道可以通过Selector实现多路复用(multiplexed)。
  • 通道通过register(Selector,int,Object)方法注册到Selector中,并返回SelectorKey(代表注册到Selector上的注册信息)。
  • 在一个Selector中,同一个通道只能注册一份;是否可以注册到多个Selector中,由程序调用isRegistered()方法决定。
  • SelectableChannel通道是线程安全的。
  • SelectableChannel包含阻塞和非阻塞两种模式,只有非阻塞时才可以注册到Selector中。

ServerSocketChannel(A selectable channel for stream-oriented listening sockets.),用于监听Socket的基于流的可选通道。

SocketChannel(A selectable channel for stream-oriented connecting sockets.),用于连接Socket的基于流额可选通道。

(2)选择器(Selector)

Selector是SelectableChannel的多路复选器,该类包含以下方法。

  • 通过open()方法创建Selector
  • 包含三种SelectorKey Set:所有注册的SelectorKey、被选的SelectorKey(通道发生事件)、被取消的SelectorKey(不可直接访问)
  • 每次select()操作,都会从被选的SelectorKey集合中删除或新增,清楚被取消的SelectorKey中的SelectorKey

(3)选择建(SelectorKey)

  选择键封装了特定的通道特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。包括读、写、连接和接收操作,如下:

public static final int OP_READ = 1 << 0;

    public static final int OP_WRITE = 1 << 2;

    public static final int OP_CONNECT = 1 << 3;

    public static final int OP_ACCEPT = 1 << 4;

3.Reactor设计模式

基于Selector的多路复用IO,机制是采用Reactor设计模式,将一个或多个客户的服务请求分离(demultiplex)和事件分发器 (dispatch)给应用程序(I/O模型之三:两种高性能 I/O 设计模式 Reactor 和 Proactor),即通过Selector阻塞等待事件发生,然后再分发给相应的处理器接口。详情可以参考该篇文章或更多的资料。

摘自链接文章中的一幅图如下:

  • Reactor是调度中心,包含select()阻塞,等待事件发生,并分发不同的业务处理。
  • 客户端请求连接时,select()接收到事件后,会调acceptor,创建连接并与客户端交互。
  • 客户端写数据给服务端时,select()接收到事件后,调read操作,读取客户端数据,可以采用线程池对与客户端交互,对数据进行处理。
  • 服务端可也以发生数据给客户端。

4.总结

  1. SelectableChannel(ServerSocketChannel和SocketChannel)可以注册到Selector中,并用选择键(SelectorKey)进行分装

  2. SelectorKey中包含选择器感兴趣的事件(读、写、连接和接收)

  3. Selector中select()方法阻塞,直到注册通道有事件发生,可以一个线程监控多个客户端,实现多路复用

  4. 基于Selector的多路复用采用Reactor设计模式,使得选择器与业务处理进行分离。

  5. Netty是异步基于事件的应用框架,其实现是基于Java NIO的,并对其进行了优化,可以进一步学习。

5. 参考

《Thinking in Enterprise Java》

Java NIO系列教程(六) 多路复用器Selector

Java 并发编程 - 并发难点及解决方法

Java 并发编程 - 并发难点及解决方法

Java 并发编程 - 并发难点及解决方法

前言

本文的目的是有一篇视角不一样的文章,通过技术推理,思维演进的方式来理解技术问题以及解决方案,相关的问题和方案存在着一定的通用性;适用于了解大部分基础的读者,用来从宏观视角思考,需要了解线程的基础操作,cas,aqs 等基础,本文不会介绍具体类的适用,实现的源码分析等重复性工作。此文从包含重要基础概念,与少量实现解析,会从源代码以及书籍中抽取核心概念,实现范式概括。阅读需要一定的前提条件,也可以作为部分书籍的所以然快速补充;

不才之处还请不吝赐教,不足之处会持续补充,不明之处欢迎留言;未经允许,禁止转载.

并发难点

顺序性

因为多个 CPU 执行多个线程,所以谁先谁后就很重要,比如 CPU1 否则读取,CPUB 负责计算,那么计算要保证在读取之后.

就好像你买菜,你媳妇做饭,那么做饭这件事要等待买菜完成,但是两件事是不同的两个人用不同的环境实现的.

现实世界的解决方案就是:你媳妇等你买菜,你买完菜了通知你媳妇可以开始做饭了。这就需要你媳妇等待做饭,你通知她做饭. 此为等待通知机制

可见性

因为 CPU 读取数据的时候有缓存,写出数据的时候有缓冲区,所以 CPU1 读写完的数据并不一定能被 CPU2 看到结果.

就好像你媳妇给你和你老弟发短信去买白菜,如果你们俩不带电话,记在脑子里 (缓存数据), 那么你媳妇此时改变主意了,要买黄瓜了 (其他线程修改数据), 你就不知道。所以需要带上电话,你媳妇改变注意的时候,你俩就不知道,就会买错菜.

现实世界的解决方案就是,你们带上电话,你媳妇改变主意了给你们再发信息,此为广播通知 , 你们收到消息,知道改变主意了,在采购的时候重新查看一下短信,获取最新的购买需求,此为缓存一致.

原子性

因为多个 CPU 执行多个线程,所以无法保证一个数据的读写只能被一个 CPU 的一个线程执行,中间可能有其他 CPU 的其他线程干预.

就好像你媳妇做好的菜饭,需要放到橱柜,此时菜能吃,但是要防止你那个调皮的儿子来吐口水,如果吐了口水,则不能吃了.

现实世界的解决方案就是,给你儿子锁屋子里,但是锁太大了 (总线锁), 妨碍了你儿子玩你,所以锁了橱柜 (缓存锁), 你儿子仅仅不能碰那些大米饭.

解决方法

对某个资源加锁来保证原子性

多个操作者操作一个资源,需要对资源上锁,处理完再解锁。这样可以保证数据原子操作,就好像互联网常用的分布式锁 ,CPU 也会有总线锁 / 缓存锁.

锁定的是公共资源

从现实世界来说,我们在使用公共资源的时候,通常都会独占某公共资源;但是你使用你的胳膊,嗓子,眼睛的时候,这是你的私有资源,不会影响别人,除非别人想摘掉你的胳膊,否则你的私有资源使用与别人无关.

注意锁的粒度

就好像你把你儿子关起来,还是把饭菜锁起来一样,锁需要有一个粒度,很明显的是,在入库的时候我们不会让整个系统其他所有入库操作都禁止,而是当前操作的那个仓库被锁住,想想商城里的公共卫生间不会因为有人进入卫生间,而锁住整个商场不让其他人进出.

同样,CPU 在上锁的时候有两个选择①将整个 CPU 操作的总线锁定,②将会涉及到的缓存锁定。很明显锁定缓存让锁的粒度更小,其他 CPU 受到的影响也小.

因此我们要知道,锁的粒度会对系统性能有很大影响.

用比较和替换 (CAS) 来保证原子性

其实计算机中的数据只有读写两种操作,原子性操作就是保证你读取和写入之间没有别人写入。我们可以在写入的时候比较一下,如果是从当初你读取的值写入到当前值,则是中间没有操作入过,否则就是有操作写入过。这就是 cas 操作,比较后替换.

就好像你听见儿子说他饿了 (读取儿子的状态), 你去拿来奶喂它 (写入充饥的数据), 如果你去拿奶的时候你小舅子来照顾过你的儿子给他奶了,你再喂他,他可能就吃多了撑得慌,为了避免这种情况,你需要喂他之前再问问你儿子 "小家伙,你还饿么?" 如果跟之前的信息 (饿) 不一致,则放弃喂奶.

用排队等待解决顺序性

多个操作的顺序乱序,可以让他们排队操作,此时需要等待通知来实现。首先要锁定这个资源,然后争抢使用这个资源的操作需要排队.

等待的一定是个公共资源

并且你只会在你所等待的公共资源的队列上排队;就好像你在烧烤店门口排队,那么你等待的资源就是烧烤店,而等待卫生间的人不会到烧烤店门口排队,所以你们俩之前不竞争,而是排队等待卫生间的那些人存在竞同一资源。换言之:这个世界上肯定是你占用了某资源才造成排队,比如公共电话亭,比如公共卫生间.

被动等待

就好像很多人去银行排队取钱,银行会给你一个号 (排队), 到你了用喇叭喊你 (广播通知), 你听见广播了 (唤醒) 就去柜台,在你拿到排队号的时候,你没得选,保安要求你必须排队等待。这是被动的等待,由 Jvm 给我们提供的 shynchronized 关键字.

主动等待

但是你在取钱的时候发现银行现在没钱了,那么你有两个选择,①放弃取钱,②等待总行来送钱。此时你会重新排队,但是队列不一样,是一个等待拿钱就走的队列,那么此时进入等待是你主动的,在运钞车来的时候,会通知等着钱的这个队列的人,给他们现金然后让他们走人。由 jdk 为我们提供的 Object.wait 和 Object.notify 方法.

一定要获取锁才能主动等待

因为你一定要独占了这个资源,才有能力分辨出这个资源是否符合你操作的需要,否则别人也操作这个资源,你的判断可能就不准确.

就好像在卫生间门口排队,自然就是等待,而你进入卫生间才会发现里面没有纸,此时你已经获取了卫生间的使用权 (关门,上锁) 才能发现,那么你很可能让那些不需要纸就能上厕所的人先用,所以你出来了并重新排队 (wait). 所以你看,主动等待的时候会释放锁.

由此可见排队等待必须在某个资源上排队,也是唤醒排队某个资源的等待队列,所以 synchronized 关键字要对某个对象加锁,Object.wait 和 Object.notify 也是针对某个对象上的队列.

让缓存失效保证可见性

每个操作之间有缓存,需要通过广播让缓存失效,失效之后要重新获取下最新数据。比如通过 MQ 发布一个主题消息,通知其他机器某个缓存失效了,订阅主题的机器收到消息之后将本地的缓存标记失效,这是分布式系统上的,从 CPU 微观来说,连接 MQ 的网线是 CPU 的总线,类似的机制实现就是 CPU 缓存一致性.

我们最好不用 (也最好不要) 每次失效都重新读取数据,因为如果在时间点 1->2 之间失效 100 次,那么则将重新获取 100 次数据,而这 100 次获取数据都不一定会被读取一次。就好像你 10 台机器上都有用户信息,但是不一定每台的用户信息都会被读取.(此为惰性缓存)

宏观来看

如果你不了解什么是微服务,不知道 zookeeper, 不知道分布式锁,不知道消息队列服务器,可以不看这些.

你完全可以将上面这些过程想象成很多个微服务实例下,拥有自己的本地缓存,在操作某个缓存数据的时候,会在 zookeeper 上加一个分布式锁,处理完成的时候会在 mq 上某个主题发布消息,通知缓存被修改,其他实例收到消息后,将本地缓存标记失效,在下次读取数据操作的时候,重新加载缓存.

这个时候你就发现,原本的 CPU 总线换成了网线传输数据,原本 CPU 的 1,2,3 级缓存变成了微服务的本地缓存,原本的缓存锁变成了分布式锁.

但是这还不够,有的时候用户拿到数据等了很久才提交,所以我们的数据上会有一个版本号,每次提交增加版本号,每次写回数据库的时候比较当前数据库版本和用户提交的版本,这就是 cpu 上的 cas 操作.

如何实现

比较替换的实现

在 CPU 层面为我们提供了 cmpxchg 指令来实现,这是个 IA-32 的汇编指令.

缓存加锁的实现

在 CPU 层面为我们提供了 lock 指令前缀来实现,这个指令会锁定总线 / 缓存,然后独享数据的去修改,并且修改完之后会将当前 CPU 缓冲区写会主存。这是个 IA-32 的汇编指令.

缓存失效的实现

在 CPU 层面为我们提供了总线广播监听机制,这个机制可以监听缓存的失效,并将其标记失效,下次看到失效的缓存则从主存读取,这个机制的学名为 MESI (缓存一致性协议).

线程等待的实现

等待有两种方式,一个是一直等待,一个是超时等待,一直等待直接将线程保存起来,等到有唤醒的时候再调度执行就行。超时等待则需要一个定时器.

定时器的具体实现是 OS 层面的东西,比较复杂,但是现在很成熟,同时有硬件层面的支持。硬件层面有一个固定时钟和计数器,每一个时钟周期将计数器递减,计数器为 0 的时候触发 CPU 中断,这看上去很消耗 CPU, 因为每次 CPU 的周期都需要检查。也就是说操作系统级别的等待是硬件级别的操作,所以消耗是很小很小的.

其实无论如何都离不开不断检查,从现实世界来说,如果我们想等待到某时间点,唯一的办法 (不考虑闹钟等) 就是不断的检查当前时间和任务列表。不可能有其他办法。考虑其实等待到某时间点和等待一段时间其实都是一样的,所以只需要一个定时检查的计时器就可以了。这就是为什么一个线程能等待一段时间.

  • 等到某时间点 = 当前时间 + 需要等待的一段时间
  • 等待一段时间 = 最终时间点 - 当前时

排队的实现思路

有了计时器,我们在加上排队,就可以实现排队 - 等待 - 通知 , 排队的实现需要一个队列,不管你是基于链表,还是数组,总之需要一个排队的地方,现实世界中有排队的地方,就有插队的人,不同的是看管理队列的人允不允许插队而已.

此时有两种实现思路,公平排队和非公平排队.

  1. 非公平排队就好像排队吃饭,门口拥堵了一大堆人,老板每次有空桌出来叫一个人,谁挤进去就是谁,这样挤得快的人马上进去吃饭,是不公平的。因为可能有人等了半天,就因为一不小心走神,就被你捷足先登了.
  2. 公平排队拿到号码之后老板按号叫人,到你了你就进去,这样一定公平,因为一定是有序的。谁先来谁先进,但是同时会牺牲排队的效率,因为你可能昨晚没睡好,叫你进去的时候你磨蹭半天没反应,这就浪费时间了,而且老板还要维护这样一堆的号码牌.

Java 层面的实现

Jvm 级别

首先 Java 的线程跟 OS 线程是 1:1 映射关系,所以底层会有 OS 的系统调用库来实现线程创建,销毁,启动,等待操作,然后封装成 Thread 类操作。提供 synchronized 实现对一个对象资源的锁定操作,提供 volatile 实现对 CPU 缓存的锁定和缓冲区 flush 的操作.

Jdk 级别

然后在 JDK 层面,为我们提供了 Object.wait 和 Object.notify 方法与 synchronized 配合,来实现排队 - 等待 - 通知机制.

CPU 级别的 CAS 操作,jdk 提供了 UNSAFE 类来操作,这个类里面有很多 CAS 操作的方法.

JUC 级别

最后在 JUC 包内,为我们提供了 LockSupport 类,让我们可以直接操作一个线程进入等待,和唤醒线程;有了 LockSupport 类,便可以向 OS 操作线程一样去实现高级的排队 - 等待 - 通知 - 唤醒等操作,于是提供了 AQS 线程同步框架.

有了 AQS 线程同步框架,便可以利用将线程排队 - 等待这些操作,实现非常灵活的 Lock 类.

编程实现范式

普通等待通知

// 获得锁,判断条件是否满足,不满足,则等待
// 一个线程也能在没有被通知、中断或超时的情况下唤醒,也即所谓的“虚假唤醒”,虽然这点在实践中很少发生,应用应该检测导致线程唤醒的条件,并在条件不满足的情况下继续等待,以此来防止这一点。
           synchronized (obj) {
               while (<condition does not hold>)
                   obj.wait(timeout);
           }
// 摘录来自: jdk 源代码 wait方法注释

普通等待通知且支持超时

// 获得锁,计算结束时间,循环判断条件是否满足,等待剩余时间时间,被唤醒之后检查条件,并且重新计算等待时间
public synchronized Object get(long mills) throws InterruptedException {
       long future = System.currentTimeMillis() + mills;
       long remaining = mills;
       // 当超时大于0并且result返回值不满足要求
       while ((result == null) && remaining > 0) {
              wait(remaining);
              remaining = future - System.currentTimeMillis();
       } 
   return result;
}
// 摘录来自: 方腾飞,魏鹏,程晓明 著. “Java并发编程的艺术 (Java核心技术系列)。” Apple Books. 

具体的实现类

Object 类

Object 类提供了线程的基础等待通知实现方法,这些方法要求在 synchronized 关键字范围内的代码段执行,即:获取了 jvm 提供的锁.

之所以定义在 Object 方法中,一部分原因因为 synchronized 锁是在某个对象头加锁,换言之就是锁定某个对象资源。然后进行排队等待等调度,之所以这样设计,是因为必须要有一个资源上加锁之后才能进入等待队列,所以你需要锁定这个对象,然后调用这个对象上的 wait-notify, 来实现某个资源的锁定和等待通知机制.

  1. wait 方法需要先获取对象的锁,因为 wait 需要先释放持有的锁,然后进入该对象的等待队列。
  2. notify 方法需要先获取对象的锁,因为 notify 方法会在锁的对象对应的等待队列唤醒(一个或全部)线程。

Thread 类

  1. static currentThread 获取当前线程
  2. static yield 建议调度器让出 CPU
  3. static sleep 让当前线程睡眠
  4. interrupt 设置线程的中断标志位为 true
  5. static interrupted 判断线程中断标识位,并且清除标志位,换言之:如果调用两次该方法,第二次一定返回 false
  6. isInterrupted 判断线程中断标识位,但是不会影响标志位状态.
  7. isAlive 表示当前线程启动了,但是没有运行结束
Thread.join 实现原理

该方法是一个同步方法,使用一个 this.wait 的循环来实现当前线程的等待,while 的条件是当前线程还存活,wait 导致调用线程的等待,在 join 的线程结束的时候会调用 this.notifyAll 来唤醒当前等待的线程,这个方法是 jvm 负责调用。源代码中有一句话:不建议在程序中的线程实例中使用 wait,notify 或者 notifyAll.

It is recommended that applications not use wait, notify, or notifyAll on Thread instances.

ThreadLocal 类

作用

ThreadLocal 可以在当前线程存储一个变量。核心原理是在当前线程对象下创建一个 map, 然后将当前 ThreadLocal 的实例作为 key 存储变量.

关键点

  1. ThreadLocal 跟 Thrad 类都属于 java.lang, 所以默认的成员变量可以互相访问.
  2. 在 Thread 类中存放一个 threadLocals 属性,该属性是 ThreadLocal 下的静态内部类.
  3. ThreadLocal 在调用 set,get 方法的时候,会去初始化这个属性.
  4. ThreadLocal 将自己的 this 作为 key 保存自己想要存放的变量到 threadLocal 属性.

安全隐患 如果线程没有被销毁,那么线程内部的 threadLocals 变量将会一直保持引用,无法回收,如果调用 ThreadLocal#set 方法设置的变量没有调用 remove 方法清理,则一直保持在 ThreadLocalMap 中,此时保持的值处于一个无法被回收的状态.

LockSupport 类

LockSupport 类提供了 park 系列和 unpark 系列方,内部调用 UNSAFE 类实现,UNSAFE 类由 JVM 在本地 C++ 中实现,具体实现与平台有关,park 方法在 Linux 下使用的是系统方法 pthread_cond_wait 实现;unpark 方法在 Linux 下是使用 pthread_cond_signal 实现的.

在 park (Object) 的时候,dump 线程会发现由 wait 的对象信息,是存储在 Thread.parkBlocker 字段上,通过 UNSAFE.objectFieldOffset 写到线程对象上的.

AQS 线程同步器

线程同步器实现了一个线程排队等待的框架,线程排队等待的实现离不开两点实现:①创建一个排队的线程,因为需要前后节节点的快速获取,此队列通常使用链表实现;②利用线程的等待通知机制将队列中的线程阻塞住.Java 中的线程和 OS 的线程是 1:1 映射的,线程的等待利用的是 OS 的系统调用实现,封装成了 Jdk 中的 LockSupport 类;这个机制保证了线程的有序性.

线程同步器还提供了一种原子性争抢资源的能力,他内部存储了一个状态标志,用 volatile 修饰,是一个数字类型,利用 cas 操作进行修改操作,如果操作成功,表示这个操作期间没有其他线程竞争,或者其他线程都竞争失败,此时其他线程发现自己竞争失败则进入等待队列,竞争成功的线程得以真正的执行,执行结束的时候再次修改这个状态表示,此时因为 volatile 的语义特性,最后这个修改操作会保证数据被 (所有 CPU 缓冲区) 写回主存,这个操作间接的保证了线程的原子性,可见性.

线程同步器提供了独占和共享的两组获取锁和释放锁的抽象方法

  1. tryAcquire 返回 true 表示获取到了锁,false 表示没有获取到锁,并且会进入自旋状态.
  2. tryAcquireShared 返回负数表示获取失败,0 表示并发量耗尽 (不能再获取了), 正数表示获取成功,还能继续获取,(但是正数本身的值不做意义)
线程等待实现

实现线程等待在一个死循环中无限尝试获取锁,直到成功或者被中断,在执行过程中有可能会被 LockSupport.park 方法暂停掉,在首节点执行完毕的时候会唤醒排在其之后的第一个可运行节点,节点被唤醒之后马上重新在死循环中争抢锁.

总的来说,就是在死循环中处理等待通知机制.

独占锁的处理
  1. 线程首先尝试获取资源,如果获取失败,则使用 cas 操作加入等待队列;如果获取成功,则标记为首节点.
  2. 获取失败的线程进入资源申请的死循环,每次循环首先检查,如果前驱节点是首结点,则尝试 CAS 获取锁,如果不是,则调用 LockSupport.park 进入等待状态.
  3. 首节点的任务处理完毕之后会释放锁,在释放锁的时候唤醒下一个可执行的节点的线程.
  4. 唤醒的线程如果处于次 (前驱是首节点) 节点,此时线程应该处于一个自旋获取锁的状态,此时将快速的获取锁,否则处于一个等待状态,则恢复开始再次获取资源或进入等待.
共享锁的处理

初始一个令牌桶,桶中包含多个锁,锁的数量影响并发数量,,每次获取锁,则从桶中拿走一个 (或者多个) 令牌,每次释放,则从桶中放回锁,每次取出和释放的过程使用 cas 操作,保证原子性,则允许通过的线程是:桶大小 (锁总数)/ 桶粒度 (每次获取锁数)

Condition 等待条件

Condition 在自己内部又做了一个队列,在 wait 的时候将当前线程放入队列,在 notify 的时候从队列唤醒,因为 condition 要求在调用 wait-notify 的时候必须获取锁,所以内部在 wait 和 notify 的时候无需 cas 操作.

如何保证可见性

unlock 操作会 cas 修改一个 volatile 变量,而 volatile 变量的修改会运行一条 LOCK 指令,该指令会锁缓存 (或者总线) 保证其他 CPU 不会并发访问数据,并且写完数据之后会立即将这颗 CPU 的将缓冲区数据刷入主存,同时

并发工具类

CountDownLatch

利用 AQS 实现,比如 5 个线程,初始设置 state=5; 每次结束一个线程 (countDown) 做减法: state=state-1;await 等待方法则是从申请 1 个值,但是在 tryAcquireShared 方法中实现的是如果状态不是 0, 则不允许,如果是 0, 则允许.

Semaphore

利用 AQS 实现,类似 CountDownLatch, 初始化一个令牌桶,每次申请 N 个令牌桶,不够申请则等待。区别是 Semaphore 只要桶内数量足够申请就行,CountDownLatch 要求必须只剩下 0 个.

CyclicBarrier

利用 ReentrantLock 以及 ReentrantLock#newCondition , 内部使用一个 int count 做计数,每次 await 一个线程的时候,加锁,做 count--, 如果 count 是 0, 则重新开始,重新开始的时候 condition.notifyall; 如果不是 0, 则 condition.await (此时会释放锁);

原子操作类

大多数原子类,都是利用一个 Unsafe 类和一个 volatile 变量包装的。执行原子更新的时候适用 cas 方式来执行更新.

Unsafe 类底层实现依赖于汇编指令 cmpxchg 实现原子性操作.

但是 lazySet 却是使用 UNSAFE.putOrderedInt 方法执行的,底层虚拟机调用的是 C++ 方法的 SET_FIELD_VOLATILE(obj, offset, jlong, x); 仅仅是实现了 volatile 的语义操作,该操作导致修改后不是立即可见的.

计数器类

核心实现是创建一个 base 值,做基础累加,在 cas 操作 base 值失败之后,根据 CPU 数量创建多个 Cell, 将计算的逻辑分散到各个 cell 中,此时每个 CPU 最多操作自身的一个 cell, 几乎是资源独占的模式工作,所以累加效率会大有提高,但是读取值的时候需要将所有 cell 以及 base 值,读取速度可能变慢.

就好像是一个分桶排序的算法,因为一个桶放了太多数排序太慢,而分为多个桶来计算;或者说数据库表太大,则分成多个表.

其实我们的程序大多数都是写多读少的

并发编程锁

Java 中锁的实现都是基于 AQS 实现的,具体一个锁的实现会选择实现 AQS 的独占锁或者共享锁的一种,在初始化的时候初始化一个 state, 独占锁通常为 1, 共享锁可能是大于 1, 获取锁则 acquire (1), 释放锁则 release (1).

ReentrantLock

ReentrantLock 实现了初始 state=0 的一个 AQS, 每次 acquire 则增加 1,release 则减少 1

重入实现

acquire 的时候如果当前线程是获取到锁的线程,则会再次增加 1, 来实现重入获取,release 在发现 state=0 的时候,则将当前获取锁的线程标记为 null, 来实现重入释放.state 不是 0, 且当前线程没有持有锁,则不允许获取锁.

公平与非公平实现

公平锁在请求锁的时候会判断是不是有一个前驱节点在排队,如果有则放弃 cas 操作 (tryAcquire 返回 false), 非公平锁则不判断直接进行 cas 操作.

判断前驱节点在排队的方法是查看前首节点 (当前执行的节点) 的下一节点不是当前线程,换言之:当前线程不是队列中排队第二的待执行线程.

ReentrantReadWriteLock

读写锁实现了 AQS 的共享锁和独占锁两种模式,原理是将一个 int 值,按位分割成高 16 位来标记读,和低 16 位来标记写。内部有一个 int exclusiveCount (int c) 用来计算独占锁的数量,一个 int sharedCount (int c) 用来计算共享锁的数量,

写锁的获取在有读锁已被获取的情况下进行等待,没有读的时候跟 ReentrantLock 类似;读锁在没有写锁的情况下就可以用 cas 尝试更新 state 了,失败则放入等待队列,用死循环来等待.

写锁重入的实现跟 ReentrantLock 类似,只不过只能支持低 16 位最大数次重入,读锁的重入利用 ThreadLocale 来存放重入次数.

写锁获取

tryAcquire 方法注释说:

  1. 如果读状态不是 0, 或者写锁 (独占锁) 已经被获取,单不是当前线程,则直接失败 ()
  2. 如果写锁的重入次数太大,(16 位存放不下) 则抛出 error (为什么是 error, 不是 exception 呢?)
  3. 其他情况下,这个线程将会执行类似 ReentrantLock 的重入锁获取机制

读锁获取

tryAcquireShared 方法注释说:

  1. 如果独占锁数量不是 0, 并且获取的线程不是当前线程,则获取失败

  2. 其他情况下判断是否需要进入队列 (此处用于处理公平不公平实现), 如果不需要,则尝试 CAS 更新 state

  3. 在上一步失败的情况下。进入一个循环重试的状态,死循环尝试获取锁,直到成功.

锁降级

根据读锁的获取流程可以看出,获取读锁的时候,如果当前线程已经获取了独占的写锁,也可以获取读锁,此被称为锁降级. 但是写锁获取的时候,如果已经有读锁了,直接放弃,意味着无法锁升级.

锁降级是位了写完数据之后马上可以读取到数据,并且在独占的写锁中获取读锁,然后释放写锁,保证了在获取读锁的过程中不会有其他线程写入数据,保证自己写完之后看到的数据是最新的.

总结

我们讨论了 Java 并发编程的基础,从其面临的困难,以及如何解决这些困难的方案,到方案的具体实现。本文的基础章节很啰嗦,但是他很重要,那些高级的工具类,以及 lock 都是如何实现的并发包,但是基础章节却是为何要这么实现,以及如何思考到这些东西的思维演进。有了这些基础,我们可以很容易的实现线程池,阻塞队列,等高级工具.

  1. 并发编程的难点,这些难点在现实世界的样子
  2. 这些难点在现实世界以及计算机中的解决方案
  3. 这些解决方案的实现思路
  4. 这些思路的具体实现
  5. 这些具体实现的高级工具

java 并发编程 CountDownLatch 使用模型 CountDownLatch + redis

java 并发编程 CountDownLatch 使用模型 CountDownLatch + redis

final CountDownLatch latch = new CountDownLatch(4);
            if (StringUtils.isBlank(dto.getUserId())) {
                dto.setUserId(ShiroUtils.getCurrentUserId());
            }
            xxx applyNumVo = new xxx();


            
            String reqId = IdGenerator.id();
            ExecutorService es = Executors.newSingleThreadExecutor();
            es.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        
                        //业务执行结果
                        redisUtil.set(reqId+"needToExecuteNum",needToExecuteNum,60);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
            es.shutdown();

            ExecutorService es2 = Executors.newSingleThreadExecutor();
            es2.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //业务执行结果

                        redisUtil.set(reqId+"alreadyExecuteNum",alreadyExecuteNum,60);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
            es2.shutdown();


            ExecutorService es3 = Executors.newSingleThreadExecutor();
            es3.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //业务执行结果
                        redisUtil.set(reqId+"createOfMineNum",createOfMineNum,60);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
            es3.shutdown();


            ExecutorService es4 = Executors.newSingleThreadExecutor();
            es4.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //业务执行结果
                        redisUtil.set(reqId+"noticeMeNum",noticeMeNum,60);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    latch.countDown();
                }
            });
            es4.shutdown();

            latch.await();
            //结果封装
            applyNumVo.setNeedToExecuteNum(((Number) redisUtil.get(reqId+"needToExecuteNum")).longValue());
            applyNumVo.setAlreadyExecuteNum(((Number)  redisUtil.get(reqId+"alreadyExecuteNum")).longValue());
            applyNumVo.setCreateOfMineNum(((Number)  redisUtil.get(reqId+"createOfMineNum")).longValue());
            applyNumVo.setNoticeMeNum(((Number)  redisUtil.get(reqId+"noticeMeNum")).longValue());

关于python 并发编程 多路复用IOpython3并发的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于IO通信模型(三)多路复用IO、JAVA I/O(六)多路复用IO、Java 并发编程 - 并发难点及解决方法、java 并发编程 CountDownLatch 使用模型 CountDownLatch + redis等相关内容,可以在本站寻找。

本文标签: