Hadoop:撰寫 YARN 應用程式


本文檔以高階方式描述如何為 YARN 實作新的應用程式。


一般概念是應用程式提交客戶端會將應用程式提交到 YARN ResourceManager (RM)。這可以透過設定 YarnClient 物件來完成。在 YarnClient 啟動後,客戶端接著可以設定應用程式內容,準備包含ApplicationMaster (AM) 的應用程式的第一個容器,然後提交應用程式。您需要提供資訊,例如應用程式執行所需的本機檔案/jar 檔案的詳細資料、需要執行的實際指令 (加上必要的命令列引數)、任何作業系統環境設定 (選用) 等。實際上,您需要描述需要為您的 ApplicationMaster 啟動的 Unix 程序。

YARN ResourceManager 隨後會在已配置的容器上啟動 ApplicationMaster(如指定)。ApplicationMaster 與 YARN 集群進行通訊,並處理應用程式執行。它以非同步方式執行作業。在應用程式啟動期間,ApplicationMaster 的主要任務為:a) 與 ResourceManager 通訊,以協商和配置未來容器的資源,以及 b) 在容器配置後,與 YARN NodeManager(NM)通訊,以在其中啟動應用程式容器。任務 a) 可透過 AMRMClientAsync 物件非同步執行,並在 AMRMClientAsync.CallbackHandler 類型的事件處理常式中指定事件處理方法。必須將事件處理常式明確設定為客戶端。任務 b) 可透過啟動可執行物件來執行,然後在配置容器時啟動容器。作為啟動此容器的一部分,AM 必須指定包含啟動資訊(例如命令列規格、環境等)的 ContainerLaunchContext

在應用程式執行期間,ApplicationMaster 會透過 NMClientAsync 物件與 NodeManager 通訊。所有容器事件均由與 NMClientAsync 關聯的 NMClientAsync.CallbackHandler 處理。典型的回呼處理常式會處理客戶端啟動、停止、狀態更新和錯誤。ApplicationMaster 也會透過處理 AMRMClientAsync.CallbackHandlergetProgress() 方法,向 ResourceManager 回報執行進度。

除了非同步客戶端之外,還有某些工作流程的同步版本(AMRMClientNMClient)。建議使用非同步客戶端,因為(主觀上)用法較為簡單,而且本文將主要涵蓋非同步客戶端。請參閱 AMRMClientNMClient 以取得有關同步客戶端的更多資訊。



  • 客戶端<-->ResourceManager

    透過使用 YarnClient 物件。

  • ApplicationMaster<-->ResourceManager

    透過使用 AMRMClientAsync 物件,並透過 AMRMClientAsync.CallbackHandler 非同步處理事件

  • ApplicationMaster<-->NodeManager

    啟動容器。透過使用 NMClientAsync 物件與 NodeManager 通訊,並透過 NMClientAsync.CallbackHandler 處理容器事件


  • YARN 應用程式的三個主要通訊協定(ApplicationClientProtocol、ApplicationMasterProtocol 和 ContainerManagementProtocol)仍保留。這三個客戶端會封裝這三個通訊協定,以提供較為簡易的 YARN 應用程式程式設計模型。

  • 在極為罕見的情況下,程式設計人員可能想要直接使用這三個通訊協定來實作應用程式。不過,請注意此類行為不再建議用於一般使用案例

撰寫簡單的 YARN 應用程式


  • 用戶端需要執行的第一步是初始化並啟動 YarnClient。

      YarnClient yarnClient = YarnClient.createYarnClient();
  • 設定用戶端後,用戶端需要建立應用程式並取得其應用程式 ID。

      YarnClientApplication app = yarnClient.createApplication();
      GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
  • YarnClientApplication 對新應用程式的回應也包含叢集資訊,例如叢集的最小/最大資源功能。這是必要的,以確保您可以正確設定應用程式主控程式將啟動的容器規格。請參閱 GetNewApplicationResponse 以取得更多詳細資料。

  • 用戶端的主要重點是設定 ApplicationSubmissionContext,其中定義了資源管理程式啟動 AM 所需的所有資訊。用戶端需要將下列內容設定到內容中

  • 應用程式資訊:ID、名稱

  • 佇列、優先順序資訊:將提交應用程式的佇列、要指派給應用程式的優先順序。

  • 使用者:提交應用程式的使用者

  • ContainerLaunchContext:定義 AM 將啟動並執行的容器的資訊。如前所述,ContainerLaunchContext 定義執行應用程式所需的所有必要資訊,例如本機R資源(二進位檔、JAR 檔、檔案等)、E環境設定(CLASSPATH 等)、要執行的C指令和安全性Token(RECT)。

// set the application submission context
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();


// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
    localResources, null);

// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
  addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
      localResources, null);

// The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
  Path shellSrc = new Path(shellScriptPath);
  String shellPathSuffix =
      appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
  Path shellDst =
      new Path(fs.getHomeDirectory(), shellPathSuffix);
  fs.copyFromLocalFile(false, true, shellSrc, shellDst);
  hdfsShellScriptLocation = shellDst.toUri().toString();
  FileStatus shellFileStatus = fs.getFileStatus(shellDst);
  hdfsShellScriptLen = shellFileStatus.getLen();
  hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();

if (!shellCommand.isEmpty()) {
  addToLocalResources(fs, null, shellCommandPath, appId.toString(),
      localResources, shellCommand);

if (shellArgs.length > 0) {
  addToLocalResources(fs, null, shellArgsPath, appId.toString(),
      localResources, StringUtils.join(shellArgs, " "));

// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();

// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));

// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
for (String c : conf.getStrings(

// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);

// Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));

for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
if (debugFlag) {

vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");

LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();

// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);

// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);

// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);

// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
  // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
  Credentials credentials = new Credentials();
  String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
  if (tokenRenewer == null | | tokenRenewer.length() == 0) {
    throw new IOException(
      "Can't get Master Kerberos principal for the RM to use as renewer");

  // For now, only getting tokens for the default file-system.
  final Token<?> tokens[] =
      fs.addDelegationTokens(tokenRenewer, credentials);
  if (tokens != null) {
    for (Token<?> token : tokens) {
      LOG.info("Got dt for " + fs.getUri() + "; " + token);
  DataOutputBuffer dob = new DataOutputBuffer();
  ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

  • 完成設定程序後,用戶端就可以提交具有指定優先順序和佇列的應用程式。
// Set the priority for the application master
Priority pri = Priority.newInstance(amPriority);

// Set the queue to which this application is to be submitted in the RM

// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);

  • 此時,資源管理程式將接受應用程式,並在背景中執行配置符合所需規格的容器的程序,然後最終在配置的容器上設定並啟動 AM。

  • 用戶端有多種方式可以追蹤實際任務的進度。

  • 它可以與資源管理程式通訊,並透過 YarnClientgetApplicationReport() 方法要求應用程式的報告。
// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);


  • 一般應用程式資訊:應用程式 ID、提交應用程式的佇列、提交應用程式的使用者和應用程式的開始時間。

  • 應用程式主控程式詳細資料:AM 執行的主機、它用於偵聽用戶端要求的 RPC 埠(如果有)和用戶端與 AM 通訊所需的 Token。

  • 應用程式追蹤資訊:如果應用程式支援某種形式的進度追蹤,它可以設定一個追蹤網址,可透過 ApplicationReportgetTrackingUrl() 方法取得,讓客戶端可以查看以監控進度。

  • 應用程式狀態:ResourceManager 所看到的應用程式狀態可透過 ApplicationReport#getYarnApplicationState 取得。如果 YarnApplicationState 設定為 FINISHED,客戶端應參照 ApplicationReport#getFinalApplicationStatus 以查看應用程式任務本身的實際成功/失敗。在失敗的情況下,ApplicationReport#getDiagnostics 可能有助於更深入了解失敗原因。

  • 如果 ApplicationMaster 支援,客戶端可以直接透過從應用程式報告取得的 host:rpcport 資訊,向 AM 本身查詢進度更新。如果可用,它也可以使用從報告取得的追蹤網址。
  • 在某些情況下,如果應用程式執行時間過長或由於其他因素,客戶端可能希望終止應用程式。YarnClient 支援 killApplication 呼叫,讓客戶端可以透過 ResourceManager 向 AM 傳送終止訊號。ApplicationMaster 如果有此設計,也可能透過其 rpc 層支援中止呼叫,讓客戶端可以利用。

撰寫 ApplicationMaster (AM)

  • AM 是工作的實際擁有者。它將由 RM 啟動,並透過客戶端提供所有必要的資訊和資源,關於它受託監督和完成的工作。

  • 由於 AM 在容器中啟動,而該容器可能會(很可能會)與其他容器共用一個實體主機,因此在多租戶性質中,除了其他問題之外,它不能對可以監聽的預先設定埠等事項做出任何假設。

  • 當 AM 啟動時,會透過環境提供幾個參數給它。這些參數包括 AM 容器的 ContainerId、應用程式提交時間,以及執行 ApplicationMaster 的 NM(節點管理員)主機的詳細資料。參照 ApplicationConstants 以取得參數名稱。

  • 與 RM 的所有互動都需要 ApplicationAttemptId(如果發生失敗,每個應用程式可能有多次嘗試)。ApplicationAttemptId 可以從 AM 的容器 ID 取得。有輔助 API 可以將從環境取得的值轉換為物件。

Map<String, String> envs = System.getenv();
String containerIdString =
if (containerIdString == null) {
  // container id should always be set in the env by the framework
  throw new IllegalArgumentException(
      "ContainerId not set in the environment");
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
  • 在 AM 完全初始化自己後,我們可以啟動兩個客戶端:一個到 ResourceManager,一個到 NodeManagers。我們使用自訂的事件處理常式設定它們,我們將在本文稍後詳細討論這些事件處理常式。
  AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);

  containerListener = createNMCallbackHandler();
  nmClientAsync = new NMClientAsyncImpl(containerListener);
  • AM 必須向 RM 發出心跳,以讓 RM 得知 AM 仍處於運作中。RM 的逾時到期間隔由設定值定義,可透過 YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS 存取,預設值由 YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS 定義。ApplicationMaster 需要向 ResourceManager 註冊自己,才能開始發出心跳。
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
  • 在註冊回應中,會包含最大的資源功能。您可能想使用它來檢查應用程式的要求。
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);

int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);

// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
  LOG.info("Container memory specified above max threshold of cluster."
      + " Using max value." + ", specified=" + containerMemory + ", max="
      + maxMem);
  containerMemory = maxMem;

if (containerVirtualCores > maxVCores) {
  LOG.info("Container virtual cores specified above max threshold of  cluster."
    + " Using max value." + ", specified=" + containerVirtualCores + ", max="
    + maxVCores);
  containerVirtualCores = maxVCores;
List<Container> previousAMRunningContainers =
LOG.info("Received " + previousAMRunningContainers.size()
        + " previous AM's running containers on AM registration.");
  • 根據工作需求,AM 可以要求一組容器來執行其工作。我們現在可以計算我們需要多少個容器,並要求這些容器。
List<Container> previousAMRunningContainers =
LOG.info("Received " + previousAMRunningContainers.size()
    + " previous AM's running containers on AM registration.");

int numTotalContainersToRequest =
    numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainersToRequest; ++i) {
  ContainerRequest containerAsk = setupContainerAskForRM();
  • setupContainerAskForRM() 中,以下兩件事需要一些設定
  • 資源功能:目前,YARN 支援基於記憶體的資源需求,因此要求應定義需要多少記憶體。值以 MB 定義,且必須小於叢集的最大功能,並且是最小功能的精確倍數。記憶體資源對應於施加在工作容器上的實體記憶體限制。它也將支援基於運算的資源 (vCore),如程式碼所示。

  • 優先順序:在要求一組容器時,AM 可以為每組定義不同的優先順序。例如,Map-Reduce AM 可能為 Map 工作所需的容器分配較高的優先順序,而為 Reduce 工作的容器分配較低的優先順序。

private ContainerRequest setupContainerAskForRM() {
  // setup requirements for hosts
  // using * as any host will do for the distributed shell app
  // set the priority for the request
  Priority pri = Priority.newInstance(requestPriority);

  // Set up resource type requirements
  // For now, memory and CPU are supported so we set memory and cpu requirements
  Resource capability = Resource.newInstance(containerMemory,

  ContainerRequest request = new ContainerRequest(capability, null, null,
  LOG.info("Requested container ask: " + request.toString());
  return request;
  • 在應用程式管理員發送容器配置要求後,容器將由 AMRMClientAsync 客戶端的事件處理常式非同步啟動。處理常式應實作 AMRMClientAsync.CallbackHandler 介面。
  • 當有容器配置時,處理常式會設定執行啟動容器程式碼的執行緒。在此,我們使用名稱 LaunchContainerRunnable 來示範。我們將在本文的以下部分討論 LaunchContainerRunnable 類別。
public void onContainersAllocated(List<Container> allocatedContainers) {
  LOG.info("Got response from RM for container ask, allocatedCnt="
      + allocatedContainers.size());
  for (Container allocatedContainer : allocatedContainers) {
    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener);
    Thread launchThread = new Thread(runnableLaunchContainer);

    // launch and start the container on a separate thread to keep
    // the main thread unblocked
    // as all containers may not be allocated at one go.
  • 在心跳上,事件處理常式會報告應用程式的進度。
public float getProgress() {
  // set progress to deliver to RM on next heartbeat
  float progress = (float) numCompletedContainers.get()
      / numTotalContainers;
  return progress;
  • 容器啟動執行緒實際上是在 NM 上啟動容器。在容器配置給 AM 後,它需要遵循與客戶端在為將在配置的容器上執行的最終工作設定 ContainerLaunchContext 時所遵循的類似程序。一旦定義了 ContainerLaunchContext,AM 就可以透過 NMClientAsync 啟動它。
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);

// Set executable command
// Set shell script path
if (!scriptPath.isEmpty()) {
  vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
    : ExecShellStringPath);

// Set args for the shell command if any
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");

// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");

List<String> commands = new ArrayList<String>();

// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.

// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
  localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
  • NMClientAsync 物件連同其事件處理常式會處理容器事件。包括容器啟動、停止、狀態更新,以及發生錯誤。

  • 在 ApplicationMaster 確定工作已完成後,它需要透過 AM-RM 資訊客戶端取消自己的註冊,然後停止資訊客戶端。

try {
  amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
  LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
  LOG.error("Failed to unregister application", e);



我如何將我的應用程式的 jar 檔案分發到 YARN 集群中需要它的所有節點?

您可以使用 LocalResource 將資源新增到您的應用程式要求中。這將導致 YARN 將資源分發到 ApplicationMaster 節點。如果資源是 tgz、zip 或 jar,您可以讓 YARN 解壓縮它。然後,您需要做的就是將解壓縮的資料夾新增到您的 classpath 中。例如,在建立您的應用程式要求時

File packageFile = new File(packagePath);
URL packageUrl = ConverterUtils.getYarnUrlFromPath(
    FileContext.getFileContext().makeQualified(new Path(packagePath)));


    "java -cp './package/*' some.class.to.Run "
    + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
    + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
    Collections.singletonMap("package", packageResource));

如您所見,setLocalResources 命令會取得名稱對應資源的映射。名稱會成為應用程式 cwd 中的符號連結,因此您只要使用 ./package/* 就能參考裡面的成品。

注意:Java 的 classpath (cp) 參數非常敏感。請確保語法完全正確。

一旦您的套件分發到您的 AM,您需要在您的 AM 啟動新的容器時遵循相同的程序(假設您希望資源傳送到您的容器)。這段程式碼是相同的。您只需要確保您提供套件路徑(HDFS 或本機)給您的 AM,以便它可以將資源 URL 與容器 ctx 一起傳送。

我如何取得 ApplicationMaster 的 ApplicationAttemptId

ApplicationAttemptId 會透過環境傳遞給 AM,而來自環境的值可以透過 ConverterUtils 輔助函式轉換成 ApplicationAttemptId 物件。

為什麼我的容器會被 NodeManager 殺死?

這可能是因為高記憶體使用量超過您要求的容器記憶體大小。這有許多原因可能導致此問題。首先,查看 NodeManager 在殺死您的容器時傾出的程序樹。您感興趣的兩件事是實體記憶體和虛擬記憶體。如果您超過實體記憶體限制,表示您的應用程式使用過多實體記憶體。如果您執行 Java 應用程式,您可以使用 -hprof 來查看堆積中佔用空間的內容。如果您超過虛擬記憶體,您可能需要增加群集範圍設定變數 yarn.nodemanager.vmem-pmem-ratio 的值。


在啟動容器時在命令列上設定 -Djava.library.path 可能導致 Hadoop 使用的原生函式庫無法正確載入,並可能導致錯誤。建議改用 LD_LIBRARY_PATH



