聊聊PowerJob的ProcessorTracker

字节领航
• 阅读 180

本文主要研究一下PowerJob的ProcessorTracker

ProcessorTracker

tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java

@Slf4j
public class ProcessorTracker {

    /**
     * 记录创建时间
     */
    private long startTime;
    private WorkerRuntime workerRuntime;
    /**
     * 任务实例信息
     */
    private InstanceInfo instanceInfo;
    /**
     * 冗余 instanceId,方便日志
     */
    private Long instanceId;

    private ProcessorBean processorBean;
    /**
     * 在线日志
     */
    private OmsLogger omsLogger;
    /**
     * ProcessResult 上报失败的重试队列
     */
    private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    /**
     * 上一次空闲时间(用于闲置判定)
     */
    private long lastIdleTime;
    /**
     * 上次完成任务数量(用于闲置判定)
     */
    private long lastCompletedTaskCount;

    private String taskTrackerAddress;

    private ThreadPoolExecutor threadPool;

    private ScheduledExecutorService timingPool;

    private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
    /**
     * 长时间空闲的 ProcessorTracker 会发起销毁请求
     */
    private static final long MAX_IDLE_TIME = 120000;
    /**
     * 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
     */
    private boolean lethal = false;

    private String lethalReason;

    /**
     * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T)
     */
    @SuppressWarnings("squid:S1181")
    public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) {
        try {
            // 赋值
            this.startTime = System.currentTimeMillis();
            this.workerRuntime = workerRuntime;
            this.instanceInfo = request.getInstanceInfo();
            this.instanceId = request.getInstanceInfo().getInstanceId();
            this.taskTrackerAddress = request.getTaskTrackerAddress();

            this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);
            this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
            this.lastIdleTime = -1L;
            this.lastCompletedTaskCount = 0L;

            // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE
            initThreadPool();
            // 初始化定时任务
            initTimingJob();
            // 初始化 Processor
            processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(instanceInfo.getProcessorType()).setProcessorInfo(instanceInfo.getProcessorInfo()));
            log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);
        } catch (Throwable t) {
            log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t);
            lethal = true;
            lethalReason = ExceptionUtils.getMessage(t);
        }
    }

    //......
}    
ProcessorTracker接收TaskTrackerStartTaskReq参数,然后初始化线程池、初始化定时任务、初始化processorBean,它提供了submitTask、destroy方法

initThreadPool

    /**
     * 初始化线程池
     */
    private void initThreadPool() {

        int poolSize = calThreadPoolSize();
        // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);
        // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP)
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();
        // 拒绝策略:直接抛出异常
        RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();

        threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);

        // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0)
        threadPool.allowCoreThreadTimeOut(true);
    }

    /**
     * 计算线程池大小
     */
    private int calThreadPoolSize() {
        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
        ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());

        // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程
        if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
            return 1;
        }

        if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
            return instanceInfo.getThreadConcurrency();
        }
        if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
            return instanceInfo.getThreadConcurrency();
        }
        return 2;
    }    
initThreadPool创建BlockingQueue大小为128的ThreadPoolExecutor

initTimingJob

    /**
     * 初始化定时任务
     */
    private void initTimingJob() {

        // PowerJob Processor TimingPool
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();
        timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);

        timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);
    }
initTimingJob通过Executors.newSingleThreadScheduledExecutor创建ScheduledExecutorService,然后每隔10s调度CheckerAndReporter

ProcessorTrackerActor

tech/powerjob/worker/actors/ProcessorTrackerActor.java

@Slf4j
@Actor(path = RemoteConstant.WPT_PATH)
public class ProcessorTrackerActor {

    private final WorkerRuntime workerRuntime;

    public ProcessorTrackerActor(WorkerRuntime workerRuntime) {
        this.workerRuntime = workerRuntime;
    }

    /**
     * 处理来自TaskTracker的task执行请求
     * @param req 请求
     */
    @Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING)
    public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {

        Long instanceId = req.getInstanceInfo().getInstanceId();

        // 创建 ProcessorTracker 一定能成功
        ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker(
                instanceId,
                req.getTaskTrackerAddress(),
                () -> new ProcessorTracker(req, workerRuntime));

        TaskDO task = new TaskDO();

        task.setTaskId(req.getTaskId());
        task.setTaskName(req.getTaskName());
        task.setTaskContent(req.getTaskContent());
        task.setFailedCnt(req.getTaskCurrentRetryNums());
        task.setSubInstanceId(req.getSubInstanceId());

        processorTracker.submitTask(task);
    }

    /**
     * 处理来自TaskTracker停止任务的请求
     * @param req 请求
     */
    @Handler(path = RemoteConstant.WPT_HANDLER_STOP_INSTANCE)
    public void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) {

        Long instanceId = req.getInstanceId();
        List<ProcessorTracker> removedPts = ProcessorTrackerManager.removeProcessorTracker(instanceId);
        if (!CollectionUtils.isEmpty(removedPts)) {
            removedPts.forEach(ProcessorTracker::destroy);
        }
    }
}
ProcessorTrackerActor提供了onReceiveTaskTrackerStartTaskReq,用于处理startTask,这里会获取或者创建ProcessorTracker,然后执行processorTracker.submitTask

getSuitableWorkers

tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

    public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {

        List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());

        workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));

        DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
        switch (dispatchStrategy) {
            case RANDOM:
                Collections.shuffle(workers);
                break;
            case HEALTH_FIRST:
                workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                break;
            default:
                // do nothing
        }

        // 限定集群大小(0代表不限制)
        if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
            workers = workers.subList(0, jobInfo.getMaxWorkerCount());
        }
        return workers;
    }

    private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {
        ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);
        if (clusterStatusHolder == null) {
            log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
            return Collections.emptyMap();
        }
        return clusterStatusHolder.getAllWorkers();
    }    
WorkerClusterQueryService提供了getSuitableWorkers方法,可以根据JobInfoDO信息来查找合适的worker;它首先根据appId来查找对应的clusterStatusHolder,再获取该集群所有worker信息;然后通过filterWorker方法移除(DesignatedWorkerFilter、DisconnectedWorkerFilter、SystemMetricsWorkerFilter)

小结

ProcessorTracker接收TaskTrackerStartTaskReq参数,然后初始化线程池、初始化定时任务、初始化processorBean,它提供了submitTask、destroy方法;ProcessorTrackerActor提供了onReceiveTaskTrackerStartTaskReq,用于处理startTask,这里会获取或者创建ProcessorTracker,然后执行processorTracker.submitTask;DispatchService的dispatch方法会调用getSuitableWorkers方法来确定ServerScheduleJobReq的allWorkerAddress,最后HeavyTaskTracker会根据ProcessorTrackerStatusHolder的getAvailableProcessorTrackers来进行任务派发。如果job的processBean在某些worker不存在的话,会报错PowerJobException: fetch Processor failed, please check your processorType and processorInfo config,然后ptReportTask的taskStatus为WORKER_PROCESS_FAILED。

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Wesley13 Wesley13
3年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Stella981 Stella981
3年前
Python+Selenium自动化篇
本篇文字主要学习selenium定位页面元素的集中方法,以百度首页为例子。0.元素定位方法主要有:id定位:find\_element\_by\_id('')name定位:find\_element\_by\_name('')class定位:find\_element\_by\_class\_name(''
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
NEO从源码分析看UTXO交易
_0x00前言_社区大佬:“交易是操作区块链的唯一方式。”_0x01交易类型_在NEO中,几乎除了共识之外的所有的对区块链的操作都是一种“交易”,甚至在“交易”面前,合约都只是一个小弟。交易类型的定义在Core中的TransactionType中:源码位置:neo/Core/TransactionType
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Stella981 Stella981
3年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata