聊聊powerjob的执行机器地址

探手罗汉
• 阅读 169

本文主要研究一下powerjob的执行机器地址(designatedWorkers)

SaveJobInfoRequest

powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java

@Data
public class SaveJobInfoRequest {

    /**
     * id of the job. set null to create or non-null to update the job.
     */
    private Long id;

    //......

    /* ************************** PowerJob-worker cluster property ************************** */
    /**
     * Designated PowerJob-worker nodes. Blank value indicates that there is
     * no limit. Non-blank value means to run the corresponding machine(s) only.
     * example: 192.168.1.1:27777,192.168.1.2:27777
     */
    private String designatedWorkers;
    /**
     * Max count of PowerJob-worker nodes.
     */
    private Integer maxWorkerCount = 0;

    //......
}    
SaveJobInfoRequest定义了designatedWorkers,用于指定woker节点的地址

JobInfoDO

powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {
        @Index(name = "idx01_job_info", columnList = "appId,status,timeExpressionType,nextTriggerTime"),
})
public class JobInfoDO {


    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
    @GenericGenerator(name = "native", strategy = "native")
    private Long id;

    //......

    /**
     * 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割)
     */
    private String designatedWorkers;
    /**
     * 最大机器数量
     */
    private Integer maxWorkerCount;

    //......

}    
JobInfoDO定义了designatedWorkers,用于指定运行的机器地址

DesignatedWorkerFilter

powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java

@Slf4j
@Component
public class DesignatedWorkerFilter implements WorkerFilter {

    @Override
    public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {

        String designatedWorkers = jobInfo.getDesignatedWorkers();

        // no worker is specified, no filter of any
        if (StringUtils.isEmpty(designatedWorkers)) {
            return false;
        }

        Set<String> designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(designatedWorkers));

        for (String tagOrAddress : designatedWorkersSet) {
            if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) {
                return false;
            }
        }

        return true;
    }

}
DesignatedWorkerFilter会根据jobInfo.getDesignatedWorkers()来进行过滤,在指定机器列表内的返回false,否则返回true

getSuitableWorkers

powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

    public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {

        List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());

        workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));

        DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
        switch (dispatchStrategy) {
            case RANDOM:
                Collections.shuffle(workers);
                break;
            case HEALTH_FIRST:
                workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                break;
            default:
                // do nothing
        }

        // 限定集群大小(0代表不限制)
        if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
            workers = workers.subList(0, jobInfo.getMaxWorkerCount());
        }
        return workers;
    }
WorkerClusterQueryService的getSuitableWorkers会为该job选出合适的worker,它会将filterWorker返回true的机器给删除掉

小结

powerjob的执行机器地址(designatedWorkers)用于指定该job运行的worker机器列表,DesignatedWorkerFilter会根据jobInfo.getDesignatedWorkers()来进行过滤,在指定机器列表内的返回false,否则返回true,WorkerClusterQueryService的getSuitableWorkers会为该job选出合适的worker,它会将filterWorker返回true的机器给删除掉。

点赞
收藏
评论区
推荐文章
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Wesley13 Wesley13
4年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Easter79 Easter79
4年前
startup.com主要代码
 Su Main的主要代码startup.com的入口代码bits 16section .textorg 0Start:  jmp  RealStart   ;地址为0x20000   times 512($$$) db 0 RealStart:
Stella981 Stella981
4年前
Bootloader的结构和启动过程
CPU上电后,会在某个地址开始执行,比如MIPS结构的CPU会从0xBFC00000取第一条指令,而ARM结构的CPU则从0x00000000开始,嵌入式开发板中,需要把存储器件ROM或Flash等映射到这个地址。而Bootloader就存在这个地址的开始处,这样一上电后就会从这个地址处执行。Bootloader执行后从板子上的某个固态存储设备上将操作系统O
Wesley13 Wesley13
4年前
PIC中档单片机汇编指令详解(5)
位操作指令详述BCF数据寄存器指定位清0语法形式:BCFf,b操作数:f为数据寄存器的低7位地址(0x00~0x7F)B为数据位编号(0~7)执行时间:一个指令周期执行过程:使数据寄存器f的的b位清0状态标志影响:无说明:该指令可对任何数据寄存器的任意一个位置清0,常用于标志位的设定和清除,或者把某一管脚置成低电平。指
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
4年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
Stella981 Stella981
4年前
Linux日志安全分析技巧
0x00前言我正在整理一个项目,收集和汇总了一些应急响应案例(不断更新中)。GitHub地址:https://github.com/Bypass007/EmergencyResponseNotes本文主要介绍Linux日志分析的技巧,更多详细信息请访问Github地址,欢迎Star。0x01日志简介Lin
Stella981 Stella981
4年前
Kafka 生产者与可靠性保证ACK(2)
生产者消息发送流程消息发送的整体流程,生产端主要由两个线程协调运行。分别是main线程和sender线程(发送线程)。在Kafka(2.6.0版本)源码中,可以看到。源码地址:kafka\clients\src\main\java\org.apache.kafka.clients.producer.KafkaProdu