我的python多线程和多进程

隔壁老王 等级 397 0 0

伪并发 异步和多路复用IO

线程存在空闲 from multiprocessing.dummy import Pool
from multiprocessing.dummy import Pool as ThreadPool
  pool = ThreadPool(20)
            pool.map(job_worker, result_cursor)
            pool.close()
            pool.join()
"""
可以实现并发
但是,请求发送出去后和返回之前,中间时期线程空闲
编写方式:
    - 直接返回处理
    - 通过回调函数处理
"""
########### 编写方式一 ###########
"""
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def task(url):
    response = requests.get(url)
    print(url,response)
    # 写正则表达式


pool = ThreadPoolExecutor(7)
url_list = [
    'http://www.cnblogs.com/wupeiqi',
    'http://huaban.com/favorite/beauty/',
    'http://www.bing.com',
    'http://www.zhihu.com',
    'http://www.sina.com',
    'http://www.baidu.com',
    'http://www.autohome.com.cn',
]
for url in url_list:
    pool.submit(task,url)

pool.shutdown(wait=True)
"""
效果相同,增加会掉函数
########### 编写方式二 ###########
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def task(url):
    """
    下载页面
    :param url:
    :return:
    """
    response = requests.get(url)
    return response

def done(future,*args,**kwargs):
    response = future.result()
    print(response.status_code,response.content)

pool = ThreadPoolExecutor(7)
url_list = [
    'http://www.cnblogs.com/wupeiqi',
    'http://huaban.com/favorite/beauty/',
    'http://www.bing.com',
    'http://www.zhihu.com',
    'http://www.sina.com',
    'http://www.baidu.com',
    'http://www.autohome.com.cn',
]
for url in url_list:
    v = pool.submit(task,url)
    v.add_done_callback(done)

pool.shutdown(wait=True)
多进程实现并发
"""
可以实现并发
但是,请求发送出去后和返回之前,中间时期进程空闲
编写方式:
    - 直接返回处理
    - 通过回调函数处理
"""

########### 编写方式一 ###########
"""
from concurrent.futures import ProcessPoolExecutor
import requests
import time

def task(url):
    response = requests.get(url)
    print(url,response)
    # 写正则表达式


pool = ProcessPoolExecutor(7)
url_list = [
    'http://www.cnblogs.com/wupeiqi',
    'http://huaban.com/favorite/beauty/',
    'http://www.bing.com',
    'http://www.zhihu.com',
    'http://www.sina.com',
    'http://www.baidu.com',
    'http://www.autohome.com.cn',
]
for url in url_list:
    pool.submit(task,url)

pool.shutdown(wait=True)
"""

########### 编写方式二 ###########
from concurrent.futures import ProcessPoolExecutor
import requests
import time

def task(url):
    response = requests.get(url)
    return response

def done(future,*args,**kwargs):
    response = future.result()
    print(response.status_code,response.content)

pool = ProcessPoolExecutor(7)
url_list = [
    'http://www.cnblogs.com/wupeiqi',
    'http://huaban.com/favorite/beauty/',
    'http://www.bing.com',
    'http://www.zhihu.com',
    'http://www.sina.com',
    'http://www.baidu.com',
    'http://www.autohome.com.cn',
]
for url in url_list:
    v = pool.submit(task,url)
    v.add_done_callback(done)

pool.shutdown(wait=True)
异步IO(多线程+协程)
# 协程只是切换,不能控制什么时候切回来,异步IO实现回调
角色:使用者
    - 多线程
    - 多线程
    - 协程(微线程) + 异步IO =》 1个线程发送N个Http请求
        - asyncio
            - 示例1:asyncio.sleep(5)
            - 示例2:自己封装Http数据包
            - 示例3:asyncio+aiohttp
                aiohttp模块:封装Http数据包 pip3 install aiohttp
            - 示例4:asyncio+requests
                requests模块:封装Http数据包 pip3 install requests
        - gevent(内部异步IO+切换),greenlet+异步IO
                pip3 install greenlet
                pip3 install gevent
            - 示例1:gevent+requests
            - 示例2:gevent(协程池,最多发多少个请求)+requests
            - 示例3:gevent+requests => grequests
                    pip3 install grequests

        - Twisted
            pip3 install twisted
        - Tornado
            pip3 install tornado

        =====> gevent > Twisted > Tornado > asyncio
异步IO
import asyncio


@asyncio.coroutine
def func1():
    print('before...func1......')
    yield from asyncio.sleep(5)
    print('end...func1......')


tasks = [func1(), func1()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
异步IO实现tcp发http
import asyncio


@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    reader, writer = yield from asyncio.open_connection(host, 80)
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()


loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
异步IO实现发http
import aiohttp
import asyncio


@asyncio.coroutine
def fetch_async(url):
    print(url)
    response = yield from aiohttp.request('GET', url)
    print(url, response)
    response.close()


tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
异步IO + requests
import asyncio
import requests


@asyncio.coroutine
def fetch_async(func, *args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None, func, *args)
    response = yield from future
    print(response.url, response.content)


tasks = [
    fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
    fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
gevent + requests
import gevent

import requests
from gevent import monkey

monkey.patch_all()


def fetch_async(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url, response.content)

# ##### 发送请求 #####
gevent.joinall([
    gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

# ##### 发送请求(协程池控制最大协程数量) #####
# from gevent.pool import Pool
# pool = Pool(None)
# gevent.joinall([
#     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
#     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
#     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
# ])
封装gevent + requests
import grequests


request_list = [
    grequests.get('http://httpbin.org/delay/1', timeout=0.001),
    grequests.get('http://fakedomain/'),
    grequests.get('http://httpbin.org/status/500')
]


# ##### 执行并获取响应列表 #####
# response_list = grequests.map(request_list)
# print(response_list)


# ##### 执行并获取响应列表(处理异常) #####
# def exception_handler(request, exception):
# print(request,exception)
#     print("Request failed")

# response_list = grequests.map(request_list, exception_handler=exception_handler)
# print(response_list)
twisted
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor


def one_done(arg):
    print('----------------------------------------------- %s' % arg)


def all_done(arg):
    print('done===========================================')
    reactor.stop()


@defer.inlineCallbacks  # 发送Http请求,立即返回
def task(url):
    res = getPage(bytes(url, encoding='utf8'))  # 发送Http请求
    res.addCallback(one_done)
    yield res


url_list = [
    'http://www.cnblogs.com',
    'http://www.cnblogs.com',
    'http://www.cnblogs.com',
    'http://www.cnblogs.com',
]

defer_list = []  # [特殊,特殊,特殊(已经向url发送请求)]
for url in url_list:
    v = task(url)
    defer_list.append(v)

d = defer.DeferredList(defer_list)
d.addBoth(all_done)  # d特殊对象里有特殊url发送列表

reactor.run()  # 死循环 DeferredList查询,检测完成对象执行one_done,有计数器,所有执行完,执行all_done
tornado
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

COUNT = 0
def handle_response(response):
    global COUNT
    COUNT -= 1
    if response.error:
        print("Error:", response.error)
    else:
        print(response.body)
        # 方法同twisted
        # ioloop.IOLoop.current().stop()
    if COUNT == 0:
        ioloop.IOLoop.current().stop()

def func():
    url_list = [
        'http://www.baidu.com',
        'http://www.bing.com',
    ]
    global COUNT
    COUNT = len(url_list)
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)


ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start() # 死循环
自己实现IO
角色:NB开发者

    1. socket客户端、服务端
        连接阻塞
        setblocking(0): 无数据(连接无响应;数据未返回)就报错  传0或者false所有socket都不会阻塞(包括连接和接收)

    2. IO多路复用 就是while循环监听多个socket对象
        客户端:
        try:
            socket对象1.connet()
            socket对象2.connet()
            socket对象3.connet()
        except Ex..
            pass

        while True:
            r(接收端),w(发送端),e(异常) = select.select([socket对象1,socket对象2,socket对象3,],[socket对象1,socket对象2,socket对象3,],[],0.05)
            r = [socket对象1,] # 表示有人给我发送数据
                xx = socket对象1.recv()
            w = [socket对象1,] # 表示我已经和别人创建连接成功:
                socket对象1.send('"""GET /index HTTP/1.0\r\nHost: baidu.com\r\n\r\n"""')


    3. 
        class Foo:

            def fileno(self):
                obj = socket()
                return obj.fileno()

        r,w,e = select.select([socket对象?,对象?,对象?,Foo()],[],[])
        # 对象必须有: fileno方法,并返回一个文件描述符

        ========
        a. select内部:对象.fileno()
        b. Foo()内部封装socket文件描述符

 IO多路复用: 就是while循环监听多个socket对象
 异步IO: 非阻塞的socket+IO多路复用
自己实现异步IO
class HttpRequest:
    def __init__(self, sk, host, callback):
        self.socket = sk
        self.host = host
        self.callback = callback

    def fileno(self):
        return self.socket.fileno()


class HttpResponse:
    def __init__(self, recv_data):
        self.recv_data = recv_data
        self.header_dict = {}
        self.body = None

        self.initialize()

    def initialize(self):
        headers, body = self.recv_data.split(b'\r\n\r\n', 1)
        self.body = body
        header_list = headers.split(b'\r\n')
        for h in header_list:
            h_str = str(h, encoding='utf-8')
            v = h_str.split(':', 1)
            if len(v) == 2:
                self.header_dict[v[0]] = v[1]


class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = []  # 用于检测是否已经连接成功

    def add_request(self, host, callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host, 80,))
        except BlockingIOError as e:
            pass
        request = HttpRequest(sk, host, callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):

        while True:
            rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)
            for w in wlist:
                print(w.host, '连接成功...')
                # 只要能循环到,表示socket和服务器端已经连接成功
                tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" % (w.host,)
                w.socket.send(bytes(tpl, encoding='utf-8'))
                self.connection.remove(w)
            for r in rlist:
                # r,是HttpRequest
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        break
                response = HttpResponse(recv_data)
                r.callback(response)
                r.socket.close()
                self.conn.remove(r)
            if len(self.conn) == 0:
                break


def f1(response):
    print('保存到文件', response.header_dict)


def f2(response):
    print('保存到数据库', response.header_dict)


url_list = [
    {'host': 'www.baidu.com', 'callback': f1},
    {'host': 'cn.bing.com', 'callback': f2},
    {'host': 'www.cnblogs.com', 'callback': f2},
]

req = AsyncRequest()
for item in url_list:
    req.add_request(item['host'], item['callback'])

req.run()
收藏
评论区

相关推荐

python多线程原理和详解(一)
python多线程原理和详解 线程概念 1 . 线程是什么? 线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行。
1. 这才是 Python 学习的正确起手姿势,滚雪球学 Python
在博客上,我写了很多关于 Python 的文章,很多朋友可能觉得橡皮擦应该是一个 Python 开发人员或者一个技术开发人员,但很遗憾的告诉大家,橡皮擦恰好是很多公司中开发人员的对立面【产品经理】。但我是一个相当懂技术人的产品经理。 一、Python 初次接触,Python 变量与基本运算 1.1 滚雪球学 Python 课程前置导语 从本篇博
15. Python 程序运行速度如何提高十倍?第一遍滚雪球学 Python 收工
本篇文章将给大家介绍 Python 多线程与多进程相关知识,学习完该知识点之后,你的 Python 程序将进入另一个高峰。 <center<font colorred缓解一下视疲劳</font</center 15. Python 程序运行速度如何提高十倍?第一遍滚雪球学 Python 收工(https://imghelloworld.oss
python多线程详解
python中的多线程是一个非常重要的知识点,今天为大家对多线程进行详细的说明,代码中的注释有多线程的知识点还有测试用的实例。 码字不易,阅读或复制完了,点个赞! import threading from threading import Lock,Thread import time,os '''
Python多进程 - 实现多进程的几种方式
方式一: os.fork() coding:utf8 """ pidos.fork() 1.只用在Unix系统中有效,Windows系统中无效 2.fork函数调用一次,返回两次:在父进程中返回值为子进程id,在子进程中返回值为0 """ import os pidos.fork() if pid0:
Python Sanic 高并发服务开发指南
技术基础 AsyncIO Python 3.4 开始引入 AsyncIO(https://docs.python.org/3/library/asyncio.html) 模块,使得 Python 也支持异步 IO。3.5 版本里添加了 async/await 关键字,使得异步 IO 代码编写更加方便。3.6 和 3.7 版本继续进行了完善
我的python多线程和多进程
线程存在空闲 from multiprocessing.dummy import Poolfrom multiprocessing.dummy import Pool as ThreadPool pool ThreadPool(20) pool.map(job_worker, result_cursor)
Python 爬取留言板留言(二):多线程版+selenium模拟
一、项目概述本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇。本篇在第一篇的基础上做了一些改进1. 采用了多线程,设定同时运行的线程的数量为3,线程数量适中,这样在保证在同一时刻有多个线
Python 爬取留言板留言(三):多进程版+selenium模拟
一、项目概述本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇。本篇在第二篇的基础上做了一个主要改进:从多线程改变为多进程,设定同时运行的进程的数量为3,数量适中,这样在保证在同一
商业数据分析从入门到入职(8)Python模块、文件IO和面向对象
前言本文先介绍了Python中程序、模块和包的基本使用,并在此基础上介绍了Python标准库。然后详细介绍了Python中的文件IO操作,包括文本文件、二进制文件的读写和其他IO操作。最后介绍了面向对象,包括类的定义、继承的使用、鸭子类型和魔法方法。 一、程序、模块和包 1.自定义模块和包之前我们使用的.ipynb文件都不是纯Python文件,
Python初学者必备书籍《Python入门经典》高清PDF版|百度网盘免费下载|Python初学者,自学Python必读
提取码:1028以及前文提到的学习路线图内容简介Python是一种解释型、面向对象、动态数据类型的高级程序设计语言。Python可以用于很多的领域,从科学计算到游戏开发。《Python入门经典》是面向Python初学者的学习指南,详细介绍了Python编程基础,以及一些高级概念,如面向对象编程。全书分为24章。第1章介绍了Python的背景和安装方法。第2章
流畅的pythonPDF高清版
提取码:1028内容简介 · · · · · ·【技术大咖推荐】“很荣幸担任这本优秀图书的技术审校。这本书能帮助很多中级Python程序员掌握这门语言,我也从中学到了相当多的知识!”——Alex Martelli,Python软件基金会成员“对于想要扩充知识的中级和高级Python程序员来说,这本书是充满了实用编程技巧的宝藏。”——Daniel Greenf
全网最全python学习路线图,让学习不迷路
学习Python有一段时间了,最近也是在不断的整理Python相关的基础知识和学习一些新的知识,想来分享给大家。我刚开始接触Python时,和大多数初学者一样不知道从那里开始学习python,我也在网上找了许多python相关的资料来学习,但是资料多也不见得就好,因为不知道从哪里开始下手,走了许多弯路。后面我就整理了一套对初学者来说学习python能很快上手
python的这些必备干货知识点,快来看看有没有你不了解的?
Python是当前主流的编程语言之一,其优点有:一:语法简洁,可以让使用者用少量的代码完成相对复杂的效果。二:标准库和第三库多,功能强大;三:站在了人工智能和大数据的风口上;像国内的豆瓣呀,知乎呀等等知名网站都是基于python开发的,而Youtube、Reddit、Dropbpx也是用python的框架开发的。近几年学习python的小伙伴越来越多,那么p
怎么操作能使Python代码运行起来速度飞快?
Python是开发人员当中流行的编程语言之一。它应用广泛,无论是Web开发还是机器学习。Python大受欢迎的原因有很多,比如社区支持、出色的库、广泛用于机器学习和大数据以及简单的语法。尽管有这么多优点,Python还是有一个缺点:速度慢。作为一种解释性语言,Python的速度不如其他编程语言。不过,我们可以用几个技巧来克服这个问题。本文将分享几个Pytho