php+crontab+shell方案实现的秒级定时发起异步请求回调方案

LogicAegis
• 阅读 5643

方案介绍

该方案出来的场景:一天有一个业务需求,需要把我方的一些信息或订单状态等异步发起请求同步给第三方,这里就会出现定时时间和延迟时间消息的处理,考虑过很多消息队列方案(如:rabbitmq、云消息服务等)。

不过最后公司定了因为该业务流量很小,不用做那么麻烦。所以就直接出了这个方案

该方案在50条消息/s,应该压力不大,量大了就会出现一个消息延迟问题,如果不注重这个业务时间准确性,该方案承载的秒级处理在1000内应该问题也不大

方案架构图

php+crontab+shell方案实现的秒级定时发起异步请求回调方案

mysql的任务队列表

CREATE TABLE `open_queue` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `type` tinyint(4) unsigned NOT NULL DEFAULT '0' COMMENT '任务类型,可用于后续读取不同任务失败次数区分',
  `order_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '业务绑定的订单id等,根据自己自行设计业务id',
  `event_identity` varchar(30) NOT NULL DEFAULT '' COMMENT '事件表示分类,可不用,同type',
  `data` text NOT NULL COMMENT '需要处理的数据,json格式化',
  `try` tinyint(4) unsigned NOT NULL DEFAULT '1' COMMENT '特定任务需要当时就处理的次数,而不是发起回调请求的任务',
  `again` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '0不是重试记录 1重试记录,失败后发起任务未1,否者未0',
  `status` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否被执行 1是 0否',
  `create_time` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '任务创建的时间戳',
  `at_time` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '任务时间的时间戳',
  `task_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '执行结果 -1重复消息系统主动取消 0未执行 1执行成功 2执行失败',
  `task_time` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '执行时间',
  PRIMARY KEY (`id`),
  KEY `IDX_EVENT` (`event_identity`),
  KEY `IDX_AT_TIME` (`at_time`) USING BTREE,
  KEY `IDX_ORDER` (`order_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

方案实现源码

下方代码都是伪代码,大家自行根据思路自己设计业务

redis同步到mysql

该步骤可以省略,因为我方当时redis是单机部署,也没做持久化,

$redisTypeLock; //lock类型
if (!IS_WIN) {
    $file = fopen($path . "/lock/{$redisTypeLock}.lock","w+");
    if (!flock($file, LOCK_EX | LOCK_NB)) {
        flock($file,LOCK_UN);
        fclose($file);
        exit;
    }
}

//成功获得锁 开始业务执行

$count = get_redis_lLen('msgevent:' . $redisType);
if (!$count) {
    //无需要入数据库订单队列,直接返回
    return false;
}

$successCount = 0;
for ($i = 0; $i < $loopLen; $i++) {
    $data = get_redis_lPop('msgevent:' . $redisType);
    //把redis数据格式化存入到数据库持久化
    $result = $this->saveToMysql($data);
    if (!$result) {
        //如果执行失败,重新推入redis队列
        get_redis_lPush('msgevent:' . $redisType, $data);
    }

    $successCount++;
}

if (!IS_WIN) {
    flock($file, LOCK_UN);
    fclose($file);
}

定时执行任务的的入口

这个入口是定时秒级处理脚本和处理小于当前时间的脚本调用的入口

public function task_run()
{
    $timeType = $_GET['tt'] ? : 'now';
    //获取当前秒需要处理的所有消息, 依次路由后执行
    //如果你redis无需存入mysql,你可以redis的list结构实现, 一次全部取出数据集合,并删除list
    //如果接下去业务里发现发起请求失败了,重新把任务分配给redis建立恢复list,list名为t+需要什么时间执行的时间戳
    $lists = $this->getTaskLists($timeType);
    foreach ($lists as $item) {
        $this->task('event:' . $item['event_type']);
        //$this->task('event:demo1');
        //$this->task('event:demo2');
        //$this->task('event:demo3');
    }
}

实际处理业务(发起通知请求)

# 模拟一个发起的请求事件demo
private function pushDemo1Event($info)
{
    $sendInfoData = $this->getSendInfoFromInfo($info);

    //发起请求,这里做的真正的发起给对方的请求,你可以根据$sendInfo拼接各种事件或消息分类给对方
    $isSuccess = $this->sendInfo($sendInfoData);

    $data = array(
        'status'        => 1,
        'task_status'   => $isSuccess ? 1 : 2,
        'task_time'     => NOW_TIME
    );
    
    $db->save($data);

    //如果执行失败,重新插入一条记录,并把at_time生成下次执行的时间戳,这样定时器根据at_time字段可以取出判断执行到当时的秒级
    if (!$isSuccess) {

        $tryCount = $db->where(...)->count();

        if ($tryCount >= 5) {
            return false;
        }

        $this->newTask($info, $tryCount, 30);
    }
}

private function newTask($info, $tryCount, $timeout = 30)
{
    $newTime = $this->nextTaskTime($info, $timeout);
    $data = array(
        ...
        'at_time' => newTime //执行的时间(时间戳)
    );
    $db->add($data);
}

定时消费方

$timeType = $_GET['time_type'] ? : 'now';
$lists = $this->getMsgAll($timeType);
if (!$lists) {
    return false;
}

foreach ($lists as $item) {
    if ($item['type'] == '1') {
        //根据业务生成我方真实订单
        call_user_func_array(array($this, "add_real_order"), array($item));
    }
    if (in_array($item['type'], array(2,3))) {
        $this->push('push:toengineer', $item);  //指派或取消工程师
    }
    if ($item['type'] == '5') {
        $this->push('push:edit', $item);  //编辑
    }
    if ($item['type'] == '6') {
        $this->push('push:cancel', $item);  //取消
    }
    if ($item['type'] == '7' || $item['type'] == '8' || $item['type'] == '9') {
        $this->push('push:status', $item);  //简单订单状态
    }
}

借助crontab+shell外力实现秒级执行

因为crontab是最小单位是分钟,所以需要借助shell脚本来实现秒级执行

读取redis任务队列到mysql存储

#!/bin/bash
cd /web/project/src
for (( i = 1; i < 60; i = i + 1 ))
do
    #执行php直接返回,不会阻塞 不要忘记最后面的 &
    $(/usr/bin/php index.php redis2mysql > /dev/null 2>&1 &)
    sleep 1
done

exit 0

处理秒级时间sh脚本

#!/bin/bash
cd /web/project/src
for (( i = 1; i < 60; i = i + 1 ))
do
    $(/usr/bin/php index.php task_run > /dev/null 2>&1 &)
    sleep 1
done

exit 0

处理秒级小于当前时间sh脚本

#!/bin/bash
cd /web/project/src
for (( i = 1; i < 60; i = i + 1 ))
do
    $(/usr/bin/php index.php task_run/tt/lt > /dev/null 2>&1 &)
    sleep 1
done

exit 0

linux下的crontab

* * * * * sh /web/project/src/shell/repair_build_order.sh
* * * * * sh /web/project/src/shell/repair_sync_second.sh
* * * * * sh /web/project/src/shell/repair_sync_lt_time.sh
点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
4年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Easter79 Easter79
4年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
4年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
4年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这