Python分布式进程

协变苔藓
• 阅读 1499

分布式进程:
分布式进程是指的是将Process进程分布到多台机器上,充分利用多台机器的性能完成复杂的任务。在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

举个例子:做爬虫程序时,常常会遇到这样的场景,我们想抓取图片的链接地址,将链接地址存放到Queue中,另外的进程负责从Queue中读取链接地址进行下载和存储到本地。现在把这个过程做成分布式,一台机器上的进程负责抓取链接,其它机器上的进程负责下载存储,那么遇到的主要问题是将Queue暴露到网络中,让其它机器进程都可以访问,分布式进程就是将这一个过程进行了封装,我们可以将这个过程称为本队列的网络化。

创建分布式进程需要一个服务进程与任务进程:

服务进程创建:
建立队列Queue,用来进行进程间的通信。服务进程创建任务队列task_queue,用来作为 传递任务给任务进程的通道;服务进程创建结果队列result_queue,作为任务进程完成任务后回复服务进程的通道。在分布式多进程环境下,必须通过由Queuemanager获得Queue接口来添加任务.
把第一步中建立的队列在网络上注册,暴露给其它进程(主机),注册后获得网络队列,相当于本队队列的映像.
建立一个险象(Queuemanager(BaseManager))实例manager,绑定端口和验证口令。
启动第三步中建立的实例,即启动管理manager,监管信息通道
通过管理实例的方法获得通过网络访问的Queue对象,即再把网络队列实体化成可以使用的本地队列.
创建任务到"本地"队列中,自动上传任务到网络队列中,分配给任务进程进行处理。
注意:我这里是基于window操作系统的,linux系统会有所不同

coding:utf-8

taskManager.py for win

> import Queue
> from multiprocessing.managers import BaseManager
> from multiprocessing import freeze_support

任务个数

task_num = 10

定义收发队列

task_queue = Queue.Queue(task_num)
result_queue = Queue.Queue(task_num)


def get_task():
    return task_queue


def get_result():
    return result_queue

创建类似的QueueManager

class QueueManager(BaseManager):
    pass


def win_run():
    # windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定
    QueueManager.register('get_task_queue', callable=get_task)
    QueueManager.register('get_result_queue', callable=get_result)
    # 绑定端口并设置验证口令,windows下需要填写IP地址,Linux下不填,默认为本地
    manager = QueueManager(address=('127.0.0.1', 4000), authkey='qty')
# 启动
manager.start()

# 通过网络获取任务队列和结果队列
task = manager.get_task_queue()
result = manager.get_result_queue()

try:

    # 添加任务
    for i in range(10):
        print 'put task %s...' % i
        task.put(i)
    print 'try get result...'

    for i in range(10):
        print 'result is %s' % result.get(timeout=10)



except:
    print 'manage error'
finally:
    # 一定要关闭,否则会报管理未关闭的错误
    manager.shutdown()
    print 'master exit!'

if name == '__main__':

# windows下多进程可能会出现问题,添加这句可以缓解
freeze_support()
win_run()

任务进程
使用QueueManager 注册用于获取Queue的方法名称,任务进程只能通过名称来在网络上获取Queue
连接服务器中,端口和验证口令注意保持与服务进程中完全一致
从网络上获取Queue,进行本地化
从Task队列获取任务,并把结果result队列

coding:utf-8

import time
from multiprocessing.managers import BaseManager

创建类似的QueueManager:

class QueueManager(BaseManager):

pass

第一步:使用QueueManager注册用于获取Queue的方法名称

QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

第二步:连接服务器

server_addr = '127.0.0.1'
print "Connect to server %s" % server_addr

端口和验证口令注意保持与服务进程完全一致

m = QueueManager(address=(server_addr, 4000), authkey='qty')

从网络连接

m.connect()

第三步:获取Queue的对象

task = m.get_task_queue()
result = m.get_result_queue()

第四步:从task队列获取任务,并把结果写入result队列:

while not task.empty():

index = task.get(True, timeout=10)
print 'run task download %s' % str(index)
result.put('%s---->success ' % str(index))

处理结束

print 'worker exit.'

执行结果

先运行:服务进程得到结果

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...

再立即运行:任务进程得到结果,防止进程走完后得不到结果,这里一定要立即执行

Connect to server 127.0.0.1
run task download 0
run task download 1
run task download 2
run task download 3
run task download 4
run task download 5
run task download 6
run task download 7
run task download 8
run task download 9
worker exit.

最后再回头看服务进程窗口的结果

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...
result is 0---->success 
result is 1---->success 
result is 2---->success 
result is 3---->success 
result is 4---->success 
result is 5---->success 
result is 6---->success 
result is 7---->success 
result is 8---->success 
result is 9---->success 
master exit!

这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就把任务分布到几台甚至几十台机器上,实现大规模的分布式爬虫

点赞
收藏
评论区
推荐文章
Irene181 Irene181
4年前
一篇文章带你了解Python的分布式进程接口
一、前言    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。可以写一个服务进程作为调度者,将任务分
Stella981 Stella981
4年前
Python 并行分布式框架之 Celery
Celery(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.celeryproject.org%2F) (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。架构设计!(http://s
Stella981 Stella981
4年前
Hadoop完整搭建过程(三):完全分布模式(虚拟机)
1完全分布模式完全分布模式是比本地模式与伪分布模式更加复杂的模式,真正利用多台Linux主机来进行部署Hadoop,对集群进行规划,使得Hadoop各个模块分别部署在不同的多台机器上,这篇文章介绍的是通过三台虚拟机进行集群配置的方式,主要步骤为:准备虚拟机:准备虚拟机基本环境ipHost配置:手
Stella981 Stella981
4年前
Crawlscrapy分布式爬虫
1.概念:多台机器上可以执行同一个爬虫程序,实现网站数据的分布爬取2.原生的scrapy是不可以实现分布式式爬虫  a)调度器无法共享  b)管道无法共享3.scrapyredis组件:专门为scrapy开发的一套组件,该组件可以让scrapy实现分布式  a)pipinstallscrapyredis4.分布式爬取的流程:
Wesley13 Wesley13
4年前
.net RabbitMQ 介绍、安装、运行
RabbitMQ介绍什么是MQ1.MessageQueue(简称:MQ),消息队列2.顾名思义将内容存入到队列中,存入取出的原则是先进先出、后进后出。3.其主要用途:不同进程Process/线程Thread之间通信什么是RabbitMQ1.RabbitMQ是一个消
Wesley13 Wesley13
4年前
MPI多机器实现并行计算
  最近使用一个系统的分布式版本搭建测试环境,该系统是基于MPI实现的并行计算,MPI是传统基于msg的系统,这个框架非常灵活,对程序的结构没有太多约束,高效实用简单,下面是MPI在多台机器上实现并行计算的过程。  这里准备使用三台机器,假设为A,B,C,对应IP分别为:192.168.86.16(A),192.168.86.108(B),192.168
Stella981 Stella981
4年前
Scrapy框架之分布式操作
一、分布式爬虫介绍  分布式爬虫概念:多台机器上执行同一个爬虫程序,实现网站数据的分布爬取。1、原生的Scrapy无法实现分布式爬虫的原因?调度器无法在多台机器间共享:因为多台机器上部署的scrapy会各自拥有各自的调度器,这样就使得多台机器无法分配start\_urls列表中的url。管
Stella981 Stella981
4年前
Docker Get Started IV
4\.Swarms介绍在前面的部分,你知道了如何写一个应用以及如何运行在生产环境中,然后将它变为一个服务,在同一个进程中将服务能力伸缩到原来的5倍。在本部分,你将在集群上部署一个应用,运行在多台机器上。通过将多个机器加入到docker化的集群中,多容器多机器的应用是可能的,这个docker化的集群被称为蜂群。理解
Stella981 Stella981
4年前
Python 浅析线程(threading模块)和进程(process)
    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务进程与线程什么是线程(threading)?Athreadisanexecutioncontext,whichisall
Stella981 Stella981
4年前
Python process (进程)
进程(process)进程是对各种资源管理的集合,包含对各种资源的调用、内存的管理、网络接口的调用进程要操作CPU必须先启动一个线程,启动一个进程的时候会自动创建一个线程,进程里的第一个线程就是主线程程序执行的实例有唯一的进程标识符(pid)multiprossing模块
Stella981 Stella981
4年前
Golang并发解读
进程与线程概念在面向进程设计的系统中,进程(process)是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。进程是程序(指令和数据)的真正运行实例。用户下达运行程序的命令后,就会产生进程。同一程序可产生多个进程(一对多关系),以允许同时有多位用户运行同一程序,却不会相冲突。线程(th