13 2 2019

基本概念

  • 并发:一个时间段内,有多个程序在同一个CPU上运行(但任意时刻只有一个在CPU上运行);
  • 并行:任意时间点上,有多个程序同时运行在多个CPU上;
  • 同步:代码调用I/O操作时,必须等待I/O操作完成才返回;
  • 异步:代码调用I/O操作时,不必等I/O完成就返回;
  • 阻塞:调用函数时当前线程被挂起;
  • 非阻塞:调用函数时当前线程不会被挂起(立即返回)。

UNIX五种I/O模型

五种I/O模型
  • 阻塞式I/O
  • 非阻塞式I/O
  • I/O多路复用
  • 信号驱动式I/O
  • 异步I/O

阻塞式I/O

阻塞式I/O
  • 应用程序被阻塞,直到数据被复制到应用进程缓冲区才返回(如socket_http中的client.connect(),CPU停止运行等待接入);
  • 程序阻塞的过程中,其他程序仍然可以运行(不是整个操作系统阻塞),因此不会影响CPU利用率。

非阻塞式I/O

非阻塞式I/O
  • 应用进程执行系统调用后,如数据未就绪则内核会返回一个错误码,应用进程可以继续执行(如socket_http中的client.setblocking(False),请求连接后立即返回);
  • 返回不代表(如网路请求中的三次握手)已完成,需要CPU循环不断询问连接是否建立(client.send("..."),可能会抛出异常,需要异常+循环处理);
  • 由于使用非阻塞式I/O需要不断请求内核态,CPU(需要处理更多系统调用)开销很大;
  • 内核态接收网络请求后退出循环,把数据复制到用户态;
  • 对于轮询过程中需要执行其他操作的场景,性能比阻塞式I/O好。

I/O多路复用

I/O多路复用
  • 单个进程(线程)具备处理多个I/O事件的能力,避免高并发场景下多进程/多线程创建和切换的开销,有select,poll,epoll三种;
  • 单个进程(线程)同时监听多个句柄的状态,状态发生变化时(内核数据到达)可以马上处理,其中句柄状态变化的回调由程序员完成;
  • 本质上是同步I/O,读写时间就绪后自己负责进行读写(这个过程是阻塞的);
  • 其中select、poll需要轮询所有句柄,随数量增多性能下降;在高并发场景下优先选用epoll,但在并发性不高、连接很活跃(频繁开启关闭)时select比epoll好。

信号驱动式I/O

信号驱动式I/O
  • 应用进程使用sigaction系统调用,发生I/O时内核立即返回,应用程序可以继续执行(等待数据阶段非阻塞);
  • 当内核中有数据到达时向应用程序发出SIGIO信号,应用程序接收到信号在信号处理程序中调用recvfrom,将数据从内核复制到应用程序中;
  • 由于不需要轮询系统调用,信号驱动I/O的CPU利用率比非阻塞式I/O更高。

异步I/O

异步I/O
  • 应用程序执行aio_read系统调用后立即返回,可以继续执行,不会阻塞;
  • 内核在所有操作完成后向应用进程发出信号(句柄状态变化的回调由系统完成);
  • 与信号驱动I/O的区别在于:异步I/O的信号是通知应用进程I/O完成,而信号驱动I/O的信号是通知应用进程可以开始I/O。

Select,回调,事件循环

  • 回调函数:提供函数供一定条件满足后调用(回调函数中都是非I/O操作,性能很高);
  • 事件循环:不断循环列表请求句柄状态,发现状态变化时执行回调函数;
  • 高并发:驱动程序运行的loop是单线程运行(不会有内存消耗和切换问题)、非阻塞的,只会对就绪句柄执行回调函数,不会等待I/O(除非所有句柄都在等待)
import time
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()        # select函数更高层次的封装,根据环境可以自动选择select、poll或epoll
urls = []
stop = False

class Fetcher:
    def connected(self, key):
        '''
        回调函数
        :param key:
        :return:
        '''
        # 执行回调函数时,首先要对句柄取消注册
        selector.unregister(key.fd)     
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        
        # 注册句柄,监听读状态,执行回调函数readable
        selector.register(self.client.fileno(), EVENT_READ, self.readable)      

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)     # 数据读取完成
            data = self.data.decode("utf8")
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # select需要非阻塞IO
        self.client.setblocking(False)    
        try:
            self.client.connect((self.host, 80))        
        except BlockingIOError as e:
            pass

        # 注册句柄,当发生写事件时,执行回调函数connected
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
    # 事件循环,不停请求socket的状态并调用对应的回调函数
    # select本身是不支持register模式(selector是对select的封装,提供了register)
    # socket状态变化以后的回调由程序员完成
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data        # 执行注册时执行的回调函数
            call_back(key)    

# 异步
fetcher = Fetcher()
start_time = time.time()
for url in range(20):
    url = "http://shop.projectsedu.com/goods/{}/".format(url)
    urls.append(url)
    fetcher = Fetcher()
    fetcher.get_url(url)
loop()
print(time.time() - start_time)

# 同步(注意self.client.setblocking(True))
start_time = time.time()
for url in range(20):
    url = "http://shop.projectsedu.com/goods/{}/".format(url)
    get_(url)
print(time.time() - start_time)

协程

以上几种I/O模型存在以下问题:

  • 回调:代码可读性差,共享状态管理困难,异常处理困难;
  • 多线程:线程间同步、锁并发性能差,线程创建消耗内存大、切换开销大;
  • 同步:并发度低。

因此可考虑使用协程:

  • 采用同步的方式编写异步(事件循环 + I/O多路复用)代码代替回调,使用单线程切换任务(不再需要锁);
  • 自主编写调度函数,并发性能远高于线程间切换;
  • 调度函数有多个入口:遇到I/O操作把当前函数暂停、切换到另一个函数执行,在适当时候恢复。
  • 使用生成器(见“迭代器,生成器”)结合事件循环可实现协程;
  • 协程 + 事件循环的效率不比回调 + 事件循环高,其目的在于简便地解决回调复杂的问题。

async与await

为了将语义变得更加明确,Python 3.5后引入了async和await关键词用于定义原生协程;

import types

async def downloader(url):              # 使用原生协程
    return "ywh"

@types.coroutine
def downloader(url):                    # 使用生成器实现协程
    yield "ywh"     

async def download_url(url):            # async和await必须成对使用
    html = await downloader(url)        # await:执行费时操作(生成器不能直接用于await,要加上装饰器或async)
    return html


if __name__ == "__main__":
    coro = download_url("http://www.imooc.com")
    # next(None)
    coro.send(None)     # 使用原生协程只能使用send(None)

生成器实现协程

获取生成器的状态:

import inspect
def gen_func():
    yield 1
    return "ywh"

if __name__ == "__main__":
    gen = gen_func()
    print(inspect.getgeneratorstate(gen))
    next(gen)
    print(inspect.getgeneratorstate(gen))

使用生成器实现协程:

  • 一般的生成器只能作为生产者,实现为协程则可以消费外部传入的数据;
  • 使用value = yield from xxx的生成器表示返回值给调用方、且调用方通过send方法传值给生成器函数;
  • 主函数中不能添加耗时的逻辑,如把I/O操作通过yield from做异步处理;
  • 最终实现通过同步的方式编写异步代码:在适当的时候暂停、恢复启动函数。
def gen_func():
    value = yield 1
    return "ywh"
import inspect
import socket

def get_socket_data():      # 模拟从socket中获取数据,唤醒downloader
    yield "ywh"             # 如发生异常,则会抛出给downloader

def downloader(url):        # 主方法中不能添加耗时操作
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.setblocking(False)

    try:
        client.connect((host, 80))      # 阻塞不会消耗cpu
    except BlockingIOError as e:
        pass

    selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    source = yield from get_socket_data()       # 暂停,直到socket获取到数据再往下执行
    html_data = source.decode("utf8").split("\r\n\r\n")[1]
    print(html_data)

def download_html(html):
    html = yield from downloader()

if __name__ == "__main__":
    # 协程的调度:事件循环 + 协程模式(单线程)
    pass

 

延伸阅读
    < /body> < /html>