聊聊flink的AbstractNonHaServices

网络算
• 阅读 1833

本文主要研究一下flink的AbstractNonHaServices

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

public interface HighAvailabilityServices extends AutoCloseable {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /**
     * This UUID should be used when no proper leader election happens, but a simple
     * pre-configured leader is used. That is for example the case in non-highly-available
     * standalone setups.
     */
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);

    /**
     * This JobID should be used to identify the old JobManager when using the
     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
     * distinct JobID assigned.
     */
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    /**
     * Gets the leader retriever for the cluster's resource manager.
     */
    LeaderRetrievalService getResourceManagerLeaderRetriever();

    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service
     * is not always accessible.
     */
    LeaderRetrievalService getDispatcherLeaderRetriever();

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @return Leader retrieval service to retrieve the job manager for the given job
     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by
     *                              a static leader retrieval service.
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

    LeaderRetrievalService getWebMonitorLeaderRetriever();

    /**
     * Gets the leader election service for the cluster's resource manager.
     *
     * @return Leader election service for the resource manager leader election
     */
    LeaderElectionService getResourceManagerLeaderElectionService();

    /**
     * Gets the leader election service for the cluster's dispatcher.
     *
     * @return Leader election service for the dispatcher leader election
     */
    LeaderElectionService getDispatcherLeaderElectionService();

    /**
     * Gets the leader election service for the given job.
     *
     * @param jobID The identifier of the job running the election.
     * @return Leader election service for the job manager leader election
     */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

    LeaderElectionService getWebMonitorLeaderElectionService();

    /**
     * Gets the checkpoint recovery factory for the job manager
     *
     * @return Checkpoint recovery factory
     */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();

    /**
     * Gets the submitted job graph store for the job manager
     *
     * @return Submitted job graph store
     * @throws Exception if the submitted job graph store could not be created
     */
    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

    /**
     * Gets the registry that holds information about whether jobs are currently running.
     *
     * @return Running job registry to retrieve running jobs
     */
    RunningJobsRegistry getRunningJobsRegistry() throws Exception;

    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     * @throws IOException if the blob store could not be created
     */
    BlobStore createBlobStore() throws IOException;

    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------

    /**
     * Closes the high availability services, releasing all resources.
     * 
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores
     * (file systems, ZooKeeper, etc). Another instance of the high availability
     * services will be able to recover the job.
     * 
     * <p>If an exception occurs during closing services, this method will attempt to
     * continue closing other services and report exceptions only after all services
     * have been attempted to be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;

    /**
     * Closes the high availability services (releasing all resources) and deletes
     * all data stored by these services in external stores.
     * 
     * <p>After this method was called, the any job or session that was managed by
     * these high availability services will be unrecoverable.
     * 
     * <p>If an exception occurs during cleanup, this method will attempt to
     * continue the cleanup and report exceptions only after all cleanup steps have
     * been attempted.
     * 
     * @throws Exception Thrown, if an exception occurred while closing these services
     *                   or cleaning up data stored by them.
     */
    void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices

AbstractNonHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

public abstract class AbstractNonHaServices implements HighAvailabilityServices {
    protected final Object lock = new Object();

    private final RunningJobsRegistry runningJobsRegistry;

    private final VoidBlobStore voidBlobStore;

    private boolean shutdown;

    public AbstractNonHaServices() {
        this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
        this.voidBlobStore = new VoidBlobStore();

        shutdown = false;
    }

    // ----------------------------------------------------------------------
    // HighAvailabilityServices method implementations
    // ----------------------------------------------------------------------

    @Override
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneCheckpointRecoveryFactory();
        }
    }

    @Override
    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneSubmittedJobGraphStore();
        }
    }

    @Override
    public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
        synchronized (lock) {
            checkNotShutdown();

            return runningJobsRegistry;
        }
    }

    @Override
    public BlobStore createBlobStore() throws IOException {
        synchronized (lock) {
            checkNotShutdown();

            return voidBlobStore;
        }
    }

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!shutdown) {
                shutdown = true;
            }
        }
    }

    @Override
    public void closeAndCleanupAllData() throws Exception {
        // this stores no data, so this method is the same as 'close()'
        close();
    }

    // ----------------------------------------------------------------------
    // Helper methods
    // ----------------------------------------------------------------------

    @GuardedBy("lock")
    protected void checkNotShutdown() {
        checkState(!shutdown, "high availability services are shut down");
    }

    protected boolean isShutDown() {
        return shutdown;
    }
}
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices

EmbeddedHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java

public class EmbeddedHaServices extends AbstractNonHaServices {

    private final Executor executor;

    private final EmbeddedLeaderService resourceManagerLeaderService;

    private final EmbeddedLeaderService dispatcherLeaderService;

    private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;

    private final EmbeddedLeaderService webMonitorLeaderService;

    public EmbeddedHaServices(Executor executor) {
        this.executor = Preconditions.checkNotNull(executor);
        this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
        this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
        this.jobManagerLeaderServices = new HashMap<>();
        this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
    }

    // ------------------------------------------------------------------------
    //  services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return resourceManagerLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        return dispatcherLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        return resourceManagerLeaderService.createLeaderElectionService();
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        return dispatcherLeaderService.createLeaderElectionService();
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        checkNotNull(jobID);

        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderRetrievalService();
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        return getJobManagerLeaderRetriever(jobID);
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        return webMonitorLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        checkNotNull(jobID);

        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderElectionService();
        }
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        return webMonitorLeaderService.createLeaderElectionService();
    }

    // ------------------------------------------------------------------------
    // internal
    // ------------------------------------------------------------------------

    @GuardedBy("lock")
    private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
        EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
        if (service == null) {
            service = new EmbeddedLeaderService(executor);
            jobManagerLeaderServices.put(jobID, service);
        }
        return service;
    }

    // ------------------------------------------------------------------------
    //  shutdown
    // ------------------------------------------------------------------------

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!isShutDown()) {
                // stop all job manager leader services
                for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
                    service.shutdown();
                }
                jobManagerLeaderServices.clear();

                resourceManagerLeaderService.shutdown();

                webMonitorLeaderService.shutdown();
            }

            super.close();
        }
    }
}
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices

StandaloneHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java

public class StandaloneHaServices extends AbstractNonHaServices {

    /** The constant name of the ResourceManager RPC endpoint */
    private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";

    /** The fix address of the ResourceManager */
    private final String resourceManagerAddress;

    /** The fix address of the Dispatcher */
    private final String dispatcherAddress;

    /** The fix address of the JobManager */
    private final String jobManagerAddress;

    private final String webMonitorAddress;

    /**
     * Creates a new services class for the fix pre-defined leaders.
     *
     * @param resourceManagerAddress    The fix address of the ResourceManager
     * @param webMonitorAddress
     */
    public StandaloneHaServices(
            String resourceManagerAddress,
            String dispatcherAddress,
            String jobManagerAddress,
            String webMonitorAddress) {
        this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
        this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
        this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
        this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
    }

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
        }

    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

}
  • StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

小结

  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

doc

点赞
收藏
评论区
推荐文章
Stella981 Stella981
4年前
Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陈政羽(Flink社区志愿者)摘要:Flink1.11引入了CDC,在此基础上,JDBCConnector也发生比较大的变化,本文由 ApacheFlinkContributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍Flink1.11JDBCConnector的最佳实践。大纲如下:
Stella981 Stella981
4年前
Flink1.10和Hive集成需要注意的点
前几天,Flink官方release了Flink1.10版本,这个版本有很多改动。比如:Flink1.10同时还标志着对Blink的整合宣告完成,随着对Hive的生产级别集成及对TPCDS的全面覆盖,Flink在增强流式SQL处理能力的同时也具备了成熟的批处理能力。本篇博客将对此次版本升级中的主要新特性及优化、值得注意的重要
Stella981 Stella981
4年前
Flink SQL 1.11 新功能与最佳实践
本文整理自ApacheFlinkPMC,阿里巴巴技术专家伍翀(云邪)的分享,旨在帮助用户快速了解新版本Table&SQL在Connectivity和Simplicity等方面的优化及实际开发使用的最佳实践,主要分为以下四个部分:1.简要回顾Flink1.8~Flink1.11版本在Apache社区的发展趋势,其中
Stella981 Stella981
4年前
Flink 1.11 与 Hive 批流一体数仓实践
导读:Flink从1.9.0开始提供与Hive集成的功能,随着几个版本的迭代,在最新的Flink1.11中,与Hive集成的功能进一步深化,并且开始尝试将流计算场景与Hive进行整合。本文主要分享在Flink1.11中对接Hive的新特性,以及如何利用Flink对Hive数仓进行实时化改造,从而实现批流
Stella981 Stella981
4年前
Flink(二)CentOS7.5搭建Flink1.6.1分布式集群
一. Flink的下载安装包下载地址:http://flink.apache.org/downloads.html(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fflink.apache.org%2Fdownloads.html) ,选择对应Hadoop的F
Stella981 Stella981
4年前
Flink1.10和Hive集成一些需要注意的点
前几天,Flink官方release了Flink1.10版本,这个版本有很多改动。比如:Flink1.10同时还标志着对Blink的整合宣告完成,随着对Hive的生产级别集成及对TPCDS的全面覆盖,Flink在增强流式SQL处理能力的同时也具备了成熟的批处理能力。本篇博客将对此次版本升级中的主要新特性及优化、值得注意的重要
Stella981 Stella981
4年前
Flink
近期研究下Flink的相关东西,一点一点完善,先来下载地址:https://www.apache.org/dyn/closer.lua/flink/flink1.12.1/flink1.12.1binscala\_2.12.tgz(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2
Stella981 Stella981
4年前
FLINK 1.12 支持upsertSql 不再去使用了JDBCUpsertSINK了,kafka也支持upsert了
packagecom.konka.dsp;importorg.apache.flink.api.common.JobExecutionResult;importorg.apache.flink.api.common.restartstrategy.RestartStrategies;import
Stella981 Stella981
4年前
Flink技术整理
 首先先拉取Flink的样例代码mvnarchetype:generate\DarchetypeGroupIdorg.apache.flink\DarchetypeArtifactIdflinkquic
Stella981 Stella981
4年前
Flink的JobManager启动(源码分析)
都知道Flink中的角色分为Jobmanager,TaskManger在启动脚本里面已经找到了jobmanager的启动类org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint(local模式更简单直接在Driver端的env.exection()直接启动了,有兴趣可以