41 动手设计并实现一个线程池下
Diego38 40 1

1. 前言

上节实现了压力感知的线程池,压力感知线程池在Jetty和Tomcat都有类似实现。

在实际业务场景中,有许多任务顺序执行,比如数据实时统计,位置上报等场景,而原生线程池时是不关心任务顺序的,只要任务被提交上来,就被分发给线程池处理,本节就创建一个支持顺序执行的线程池。

2. 实现支持位移监测的线程池

在微信等App中,有位置实时共享的功能,由于上报坐标是需要降噪、平均等要求,所以对于同一个人的坐标上传,是需要顺序执行的。同一个用户的请求落到单机上,单机对同一用户的上报请求也是顺序处理的。

本节实现一个支持位移监测的线程池。

回顾下前几节讲的ThreadPoolExecutor的原理,执行过程是这样的: image

所有线程从队列里取任务,处理无需任务,要实现支持任务顺序执行的线程池,需要满足如下执行策略,即每个线程一个队列,在执行submit或execute时,根据特定规则比如某个字段进行hash分发到具体线程。 image

写代码前,需要考虑四点

  • 支持有序线程池内的任务需要支持按一定规则进行分发

  • 线程池内的Worker需要替换成子Executor,每个Executor一个任务队列

  • 线程池销毁后,子Executor和任务队列也应该销毁

  • 最好能同时支持有序和无序

    public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
    
      //每个线程对应一个Executor
      protected final ConcurrentMap<Integer, Executor> childExecutors = new ConcurrentHashMap<>();
    
      private final Integer maxQueueSize ;
    
      /**
       * 线程池状态
       */
      private final AtomicBoolean poolRunning = new AtomicBoolean();
    
    
/**
 * 可排序任务中线程直接扩容到maxPoolSize,每个Executor的queueSize=maxQueueSize/maxPoolSize
 * @param corePoolSize
 * @param maxPoolSize
 * @param keepAliveTime
 * @param timeUnit
 * @param maxQueueSize
 */
public OrderedThreadPoolExecutor(
        int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, Integer maxQueueSize) {
    super(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, new LinkedBlockingQueue<>(maxQueueSize));
    this.maxQueueSize = maxQueueSize;
    this.poolRunning.set(true);
}

@Override
public void execute(Runnable command) {
    if (command instanceof OrderedRunnable) {
        getChildExecutor((OrderedRunnable) command).execute(command);
        return;
    }
    //不是可排序任务,走原有流程
    super.execute(command);
}


public static interface OrderedRunnable extends Runnable {

    public Integer getDispatchKey();
}


/**
 * 可排序任务通过hash定位到具体的Executor
 * @param c
 * @return
 */
protected Executor getChildExecutor(OrderedRunnable c) {
    Integer dispatchKey = c.getDispatchKey();
    int maxPoolSize = getMaximumPoolSize();
    int index = dispatchKey % maxPoolSize;
    Executor executor  = childExecutors.get(index);
    if (executor == null) {
        int unitCapacity = maxQueueSize / maxPoolSize;
        //开启执行线程,每个Executor队列至少为1
        executor = new ChildExecutor(Math.max(unitCapacity, 1));
        Executor oldExecutor = childExecutors.putIfAbsent(index, executor);
        if (oldExecutor != null) {
            executor = oldExecutor;
        }
    }
    return executor;
}

protected void beforeExecute(Thread thread, Runnable task) {
    super.beforeExecute(thread, task);
}


protected void onAfterExecute(Runnable task, Throwable o) {
    super.afterExecute(task, o);
}

/**
 * 执行线程池退出
 */
public void shutDownNow() {
    poolRunning.compareAndSet(true, false);
    childExecutors.clear();
}


/**
 * 一个线程对应一个ChildExecutor,放弃原生线程池中的Woker,另起炉灶
 */
public final class ChildExecutor implements Executor, Runnable {
    private final Queue<Runnable> tasks ;
    private final AtomicBoolean isRunning = new AtomicBoolean();

    public ChildExecutor(Integer capacity) {
        tasks = new LinkedBlockingQueue<>(capacity);
    }

    public void execute(Runnable command) {
        tasks.add(command);
        if (!isRunning.get()) {
            //调用非有序execute,将有序execute任务提交到原生线程池
            doExecute(this);
        }
    }

    /**
     * 类似于原生线程池中的Worker.run
     */
    public void run() {
        if (isRunning.compareAndSet(false, true)) {
            try {
                Thread thread = Thread.currentThread();
                for (; ; ) {
                    if (!poolRunning.get()) {
                        return;
                    }
                    final Runnable task = tasks.poll();
                    if (task == null) {
                        break;
                    }
                    boolean ran = false;
                    beforeExecute(thread, task);
                    try {
                        task.run();
                        ran = true;
                        onAfterExecute(task, null);
                    } catch (RuntimeException e) {
                        if (!ran) {
                            onAfterExecute(task, e);
                        }
                        throw e;
                    }
                }
            } finally {
                isRunning.set(false);
            }

        }
    }
}

private void doExecute(ChildExecutor childExecutor) {
    super.execute(childExecutor);
}

public static void main(String[] args) {
    OrderedThreadPoolExecutor threadPoolExecutor = new OrderedThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, 20000);
    for (int i = 0; i < 10000; i++) {
        int finalI = i;
        OrderedRunnable task = new OrderedRunnable() {
            @Override
            public Integer getDispatchKey() {
                return finalI % 20;
            }

            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "执行了"  + finalI + "它的DispatchKey为" + finalI % 20);
            }
        };
        threadPoolExecutor.execute(task);
    }
}

}

main方法作为测试验证的用例,输出的结果如下:
```java
...
线程pool-1-thread-4执行了4734它的DispatchKey为14
线程pool-1-thread-4执行了4744它的DispatchKey为4
线程pool-1-thread-4执行了4754它的DispatchKey为14
线程pool-1-thread-4执行了4764它的DispatchKey为4
线程pool-1-thread-4执行了4774它的DispatchKey为14
线程pool-1-thread-4执行了4784它的DispatchKey为4
线程pool-1-thread-9执行了5909它的DispatchKey为9
线程pool-1-thread-9执行了5919它的DispatchKey为19
...

我们随便挑一个DispatchKey为14的任务执行,发现每次都是来自于线程pool-1-thread-4,意味着我们的代码是正确的,同一个DispatchKey被同一个线程执行。

3. 总结

支持有序执行的线程池在数据处理和监控是很有用的,比如以10秒一个窗口,将同一个机器平均的入口流量统计上报到监控平台,如果在多线程内处理,由于同一时间一个服务器上的入口流量指标会被多线程消费,要做统计记数是很难的,因此OrderedThreadPoolExecutor有一定的使用场景,netty中也有类似的实现,感兴趣同学可以看下netty3.x的OrderedMemoryAwareThreadPoolExecutor。

预览图
评论区

索引目录