手把手教你在Windows下设置分布式队列Celery的心跳轮询

Karen110
• 阅读 1043

/1 前言/

大家好,我是吴老板。用Celery 官方的话来说,Celery 是一个非常优秀的分布式队列,可应用于分布式共享中间队列和定时任务等等。

/2 版本的差异/

`Celery` 有很多个版本,各版本之间的差异可谓不小,比如最新的 `Celery6.0` 版本在稳定性远不如 `Celery4.0`,所以在使用不同版本的时候,系统给到我们的反馈可能并不能如我们所愿。

/3 服务/

 在 `windows` 下挂在 `Celery` 服务有时候会出现不稳定的情况(`unix`中暂时未发现这种情况),比如在执行定时任务的时候,过了一段时间之后,`Celery` 出现了假死状态,以至于不能按照我们指定的时间点去执行任务。

这些任务只是加入到待运行队列中(堆积在 Redis 中),只能人为重启 Celery 服务之后才能将堆积的任务释放出来运行。

这样一来,第一是定时任务在指定时间点没有正常运行,其二是在其他时间运行了这些任务,很可能会产生更新数据不及时,时间节点混乱的问题,不仅达不到业务需求,还会反受其害。

/4 设置心跳/

为了解决 Celerywindows 中的这种弊端,可以为 Celery 任务队列设置一个心跳时间,比如每一分钟或者每五分钟向 Redis 数据库发送一次数据以保证队列始终是活跃的状态,这样只要你的电脑不关机并保持网络畅通(如果是远程 Redis),Celery 任务队列服务就不会出现假死状态。

/5 举个栗子/

我总是很喜欢用示例来说话,前些时间在对某平台的商家后台进行数据采集的时候,为了使用时能自动获取该网站的 cookie

Pyppeteer 写了一个自动化登陆的脚本,和往常一样仍在 Celery 队列中并迅速的启动服务。

脚本是这样的(非常接近实际的伪代码,没办法,保命要紧)

# -*- coding: utf-8 -*-
from db.redisCurd import RedisQueue
import asyncio
import random
import tkinter
from pyppeteer.launcher import launch
from platLogin.config import USERNAME, PASSWORD, LOGIN_URL

class Login():
    def __init__(self, shopId):
        self.shopId = shopId
        self.RedisQueue = RedisQueue("cookie")

    def screen_size(self):
        tk = tkinter.Tk()
        width = tk.winfo_screenwidth()
        height = tk.winfo_screenheight()
        tk.quit()
        return {'width': width, 'height': height}

    async def login(self, username, password, url):
        browser = await launch(
            {
                'headless': False,
                'dumpio': True
            },
            args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'],
        )
        page = await browser.newPage()  # 启动新的浏览器页面

        try:
            await page.setViewport(viewport=self.screen_size())
            await page.setJavaScriptEnabled(enabled=True)  # 启用js
            await page.setUserAgent(
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'
            )
            await self.page_evaluate(page)
            await page.goto(url)
            await asyncio.sleep(2)
            # 输入用户名,密码
            await page.evaluate(f'document.querySelector("#userName").value=""')
            await page.type('#userName', username, {'delay': self.input_time_random() - 50})  # delay是限制输入的时间
            await page.evaluate('document.querySelector("#passWord").value=""')
            await page.type('#passWord', password, {'delay': self.input_time_random()})
            await page.waitFor(6000)

            loginImgVcode = await page.waitForSelector('#checkCode')  
            await loginImgVcode.screenshot({'path': './loginImg.png'})
            await page.waitFor(6000)

            res = use_cjy("./loginImg.png")
            pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234"

            await page.waitFor(6000)
            await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
            await page.waitFor(6000)

            await page.click('#subMit')
            await page.waitFor(6000)
            await asyncio.sleep(2)
            await self.get_cookie(page)
            await page.waitFor(3000)
            await self.page_close(browser)
            return {'code': 200, 'msg': '登陆成功'}
        except:
            return {'code': -1, 'msg': '出错'}

        finally:
            await page.waitFor(3000)
            await self.page_close(browser)

    # 获取登录后cookie
    async def get_cookie(self, page):
        cookies_list = await page.cookies()
        cookies = ''
        for cookie in cookies_list:
            str_cookie = '{0}={1}; '
            str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
            cookies += str_cookie
        # 将cookie 放入 cookie 池
        self.RedisQueue.put_hash(self.shopId, cookies)
        return cookies

    async def page_evaluate(self, page):
        await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
        await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }''')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
        await page.evaluate(
            '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
        await page.waitFor(3000)

    async def page_close(self, browser):
        for _page in await browser.pages():
            await _page.close()
        await browser.close()

    def input_time_random(self):
        return random.randint(100, 151)

    def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
        loop = asyncio.get_event_loop()
        i_future = asyncio.ensure_future(self.login(username, password, url))
        loop.run_until_complete(i_future)
        return i_future.result()


if __name__ == '__main__':
    Z = Login(shopId="001")
    Z.run()

Celery 任务文件是这样的


# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
import sys
import time
from db.redisCurd import RedisQueue
from send_msg.weinxin import Send_msg
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from logger.logger import log_v
from celery import Task
from platLogin.login import Login  # 登陆类
from celery import Celery

randomQueue = RedisQueue("cookie")

celery_app = Celery('task')
celery_app.config_from_object('celeryConfig')

S = Send_msg()

dl_dict = {
    'demo': {
        'cookie': '',
        'loginClass': 'Login',
    }
}

# todo 这是三种运行的状态
class task_status(Task):
    def on_success(self, retval, task_id, args, kwargs): 
        log_v.info('任务信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))

    def on_failure(self, exc, task_id, args, kwargs, einfo):  
        log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))

    def on_retry(self, exc, task_id, args, kwargs, einfo): 
        log_v.warning('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc))


# todo 随便找个hash key作为轮询对象, celery在win10系统可能不太稳定,有时候会有连接断开的情况
@celery_app.task(base=task_status)
def get_cookie_status(platName="demo"):
    try:
        # log_v.debug(f'[+] 轮询 {platName} 定时器启动 ..... Done')
        randomQueue.get_hash(platName).decode()
        log_v.debug(f'[+] 轮询 {platName} 成功 ..... Done')
        return "Erp 轮询成功"
    except:
        return "Erp 轮询失败"


@celery_app.task(base=task_status)
def set_plat_cookie(platName="demo", shopId=None):
    log_v.debug(f"[+] {platName} 正在登陆")
    core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
    result = core.run()
    return result

Celery 配置文件是这样的

from __future__ import absolute_import
import datetime
from kombu import Exchange, Queue
from celery.schedules import crontab
from urllib import parse

BROKER_URL = f'redis://root:{parse.quote("你的不规则密码")}@主机:6379/15'

# 导入任务,如tasks.py
CELERY_IMPORTS = ('monitor.tasks',)

# 列化任务载荷的默认的序列化方式
CELERY_TASK_SERIALIZER = 'json'

# 结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'

CELERYBEAT_SCHEDULE = {
    'add-every-60-seconds': {
        'task': 'tasks.get_cookie_status',
        'schedule': datetime.timedelta(minutes=1),  # 每 1 分钟执行一次
        'args': ()  # 任务函数参数
    },
}

启动服务

celery -A tasks beat -l INFO
celery -A tasks worker -l INFO -c 2

以 2 个线程启动消费者队列服务并启用定时任务,当发现当前平台的 cookie 不可用时,我会向 Celery 发送一个信号(就是调用了前面的set_plat_cookie 这个方法),消费者得到这个任务这个就会执行自动化脚本以获取 cookie 并储存在 Redis 中,使用时在从 Redis 中获取就能正常请求到该平台的数据。

在空闲时间,Celery中的 get_cookie_status 方法会每隔一分钟向 Redis 请求数据,这就是我们设置的 1分钟心跳。

这样不管我们的 Celery 是否是后台启动,都不会出现假死、卡死的状态,则万事大吉矣!!

/6 总结/

本文为了解决 Celerywindows 中的这种弊端,为 Celery 任务队列设置一个心跳时间,比如每一分钟或者每五分钟向 Redis 数据库发送一次数据以保证队列始终是活跃的状态,这样只要你的电脑不关机并保持网络畅通(如果是远程 Redis),Celery 任务队列服务都不会出现假死、卡死的状态。

**-----**------**-----**---**** End **-----**--------**-----**-****

****手把手教你在Windows下设置分布式队列Celery的心跳轮询****

往期精彩文章推荐:

手把手教你在Windows下设置分布式队列Celery的心跳轮询

欢迎各位大佬点击链接加入群聊【helloworld开发者社区】:https://jq.qq.com/?_wv=1027&k=mBlk6nzX进群交流IT技术热点。

本文转自 https://mp.weixin.qq.com/s/qi9q8qwcySEQbb51aEPXTQ,如有侵权,请联系删除。

点赞
收藏
评论区
推荐文章
浅梦一笑 浅梦一笑
2个月前
初学 Python 需要安装哪些软件?超级实用,小白必看!
编程这个东西是真的奇妙。对于懂得的人来说,会觉得这个工具是多么的好用、有趣,而对于小白来说,就如同大山一样。其实这个都可以理解,大家都是这样过来的。那么接下来就说一下python相关的东西吧,并说一下我对编程的理解。本人也是小白一名,如有不对的地方,还请各位大神指出01名词解释:如果在编程方面接触的比较少,那么对于软件这一块,有几个名词一定要了解,比如开发环
Jacquelyn38 Jacquelyn38
1年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Stella981 Stella981
1年前
Celery分布式任务队列的认识和基本操作
一、简单认识  Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。它的特点有:简单:熟悉了它的流程后,配置使用简单;高可用
Easter79 Easter79
1年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
1年前
Python celery简介
Celery异步分布式什么是celery?他是一个python开发的异步分布式任务调度模块celery本身不提供消息服务,使用第三方服务,也就是broker来传递任务,目前支持rabbitmq,redis,数据库等等。我们使用redis连接URL的格式为:redis://:password@hostname:port/
Wesley13 Wesley13
1年前
MySQL查询按照指定规则排序
1.按照指定(单个)字段排序selectfromtable_nameorderiddesc;2.按照指定(多个)字段排序selectfromtable_nameorderiddesc,statusdesc;3.按照指定字段和规则排序selec
Stella981 Stella981
1年前
Celery简单说明以及在Django中的配置
Celery1.什么是CleleryCelery是一个简单、灵活且可靠的,处理大量消息的分布式系统专注于实时处理的异步任务队列同时也支持任务调度Celery架构Celery的架构由三部分组成,消息中间件(messagebroker),任务执行单元(worker)和任务执行结果存储(taskresu
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
helloworld_34035044 helloworld_34035044
5个月前
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为