本文檔以高階方式描述如何為 YARN 實作新的應用程式。
一般概念是應用程式提交客戶端會將應用程式提交到 YARN ResourceManager (RM)。這可以透過設定 YarnClient
物件來完成。在 YarnClient
啟動後,客戶端接著可以設定應用程式內容,準備包含ApplicationMaster (AM) 的應用程式的第一個容器,然後提交應用程式。您需要提供資訊,例如應用程式執行所需的本機檔案/jar 檔案的詳細資料、需要執行的實際指令 (加上必要的命令列引數)、任何作業系統環境設定 (選用) 等。實際上,您需要描述需要為您的 ApplicationMaster 啟動的 Unix 程序。
YARN ResourceManager 隨後會在已配置的容器上啟動 ApplicationMaster(如指定)。ApplicationMaster 與 YARN 集群進行通訊,並處理應用程式執行。它以非同步方式執行作業。在應用程式啟動期間,ApplicationMaster 的主要任務為:a) 與 ResourceManager 通訊,以協商和配置未來容器的資源,以及 b) 在容器配置後,與 YARN NodeManager(NM)通訊,以在其中啟動應用程式容器。任務 a) 可透過 AMRMClientAsync
物件非同步執行,並在 AMRMClientAsync.CallbackHandler
類型的事件處理常式中指定事件處理方法。必須將事件處理常式明確設定為客戶端。任務 b) 可透過啟動可執行物件來執行,然後在配置容器時啟動容器。作為啟動此容器的一部分,AM 必須指定包含啟動資訊(例如命令列規格、環境等)的 ContainerLaunchContext
。
在應用程式執行期間,ApplicationMaster 會透過 NMClientAsync
物件與 NodeManager 通訊。所有容器事件均由與 NMClientAsync
關聯的 NMClientAsync.CallbackHandler
處理。典型的回呼處理常式會處理客戶端啟動、停止、狀態更新和錯誤。ApplicationMaster 也會透過處理 AMRMClientAsync.CallbackHandler
的 getProgress()
方法,向 ResourceManager 回報執行進度。
除了非同步客戶端之外,還有某些工作流程的同步版本(AMRMClient
和 NMClient
)。建議使用非同步客戶端,因為(主觀上)用法較為簡單,而且本文將主要涵蓋非同步客戶端。請參閱 AMRMClient
和 NMClient
以取得有關同步客戶端的更多資訊。
以下是重要的介面
客戶端<-->ResourceManager
透過使用 YarnClient
物件。
ApplicationMaster<-->ResourceManager
透過使用 AMRMClientAsync
物件,並透過 AMRMClientAsync.CallbackHandler
非同步處理事件
ApplicationMaster<-->NodeManager
啟動容器。透過使用 NMClientAsync
物件與 NodeManager 通訊,並透過 NMClientAsync.CallbackHandler
處理容器事件
注意
YARN 應用程式的三個主要通訊協定(ApplicationClientProtocol、ApplicationMasterProtocol 和 ContainerManagementProtocol)仍保留。這三個客戶端會封裝這三個通訊協定,以提供較為簡易的 YARN 應用程式程式設計模型。
在極為罕見的情況下,程式設計人員可能想要直接使用這三個通訊協定來實作應用程式。不過,請注意此類行為不再建議用於一般使用案例。
用戶端需要執行的第一步是初始化並啟動 YarnClient。
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start();
設定用戶端後,用戶端需要建立應用程式並取得其應用程式 ID。
YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
YarnClientApplication
對新應用程式的回應也包含叢集資訊,例如叢集的最小/最大資源功能。這是必要的,以確保您可以正確設定應用程式主控程式將啟動的容器規格。請參閱 GetNewApplicationResponse
以取得更多詳細資料。
用戶端的主要重點是設定 ApplicationSubmissionContext
,其中定義了資源管理程式啟動 AM 所需的所有資訊。用戶端需要將下列內容設定到內容中
應用程式資訊:ID、名稱
佇列、優先順序資訊:將提交應用程式的佇列、要指派給應用程式的優先順序。
使用者:提交應用程式的使用者
ContainerLaunchContext
:定義 AM 將啟動並執行的容器的資訊。如前所述,ContainerLaunchContext
定義執行應用程式所需的所有必要資訊,例如本機R資源(二進位檔、JAR 檔、檔案等)、E環境設定(CLASSPATH 等)、要執行的C指令和安全性Token(RECT)。
// set the application submission context ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); LOG.info("Copy App Master jar from local filesystem and add to local environment"); // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(conf); addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null); // Set the log4j properties if needed if (!log4jPropFile.isEmpty()) { addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), localResources, null); } // The shell script has to be made available on the final container(s) // where it will be executed. // To do this, we need to first copy into the filesystem that is visible // to the yarn framework. // We do not need to set this as a local resource for the application // master as the application master does not need it. String hdfsShellScriptLocation = ""; long hdfsShellScriptLen = 0; long hdfsShellScriptTimestamp = 0; if (!shellScriptPath.isEmpty()) { Path shellSrc = new Path(shellScriptPath); String shellPathSuffix = appName + "/" + appId.toString() + "/" + SCRIPT_PATH; Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); fs.copyFromLocalFile(false, true, shellSrc, shellDst); hdfsShellScriptLocation = shellDst.toUri().toString(); FileStatus shellFileStatus = fs.getFileStatus(shellDst); hdfsShellScriptLen = shellFileStatus.getLen(); hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); } if (!shellCommand.isEmpty()) { addToLocalResources(fs, null, shellCommandPath, appId.toString(), localResources, shellCommand); } if (shellArgs.length > 0) { addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " ")); } // Set the env variables to be setup in the env where the application master will be run LOG.info("Set the environment for the application master"); Map<String, String> env = new HashMap<String, String>(); // put location of shell script into env // using the env info, the application master will create the correct local resource for the // eventual containers that will be launched to execute the shell scripts env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. // It should be provided out of the box. // For now setting all required classpaths including // the classpath to "." for the application jar StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); classPathEnv.append(c.trim()); } classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( "./log4j.properties"); // Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); // Set java executable command LOG.info("Setting up app master command"); vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); // Set Xmx based on am memory size vargs.add("-Xmx" + amMemory + "m"); // Set class name vargs.add(appMasterMainClass); // Set params for Application Master vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); for (Map.Entry<String, String> entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); } if (debugFlag) { vargs.add("--debug"); } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); // Get final command StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } LOG.info("Completed setting up app master command " + command.toString()); List<String> commands = new ArrayList<String>(); commands.add(command.toString()); // Set up the container launch context for the application master ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application // Not needed in this scenario // amContainer.setServiceData(serviceData); // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); if (tokenRenewer == null | | tokenRenewer.length() == 0) { throw new IOException( "Can't get Master Kerberos principal for the RM to use as renewer"); } // For now, only getting tokens for the default file-system. final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials); if (tokens != null) { for (Token<?> token : tokens) { LOG.info("Got dt for " + fs.getUri() + "; " + token); } } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(fsTokens); } appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); // Submit the application to the applications manager // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); yarnClient.submitApplication(appContext);
此時,資源管理程式將接受應用程式,並在背景中執行配置符合所需規格的容器的程序,然後最終在配置的容器上設定並啟動 AM。
用戶端有多種方式可以追蹤實際任務的進度。
- 它可以與資源管理程式通訊,並透過
YarnClient
的getApplicationReport()
方法要求應用程式的報告。
// Get application report for the appId we are interested in ApplicationReport report = yarnClient.getApplicationReport(appId);
從資源管理程式收到的應用程式報告包含下列內容
一般應用程式資訊:應用程式 ID、提交應用程式的佇列、提交應用程式的使用者和應用程式的開始時間。
應用程式主控程式詳細資料:AM 執行的主機、它用於偵聽用戶端要求的 RPC 埠(如果有)和用戶端與 AM 通訊所需的 Token。
應用程式追蹤資訊:如果應用程式支援某種形式的進度追蹤,它可以設定一個追蹤網址,可透過
ApplicationReport
的getTrackingUrl()
方法取得,讓客戶端可以查看以監控進度。應用程式狀態:ResourceManager 所看到的應用程式狀態可透過
ApplicationReport#getYarnApplicationState
取得。如果YarnApplicationState
設定為FINISHED
,客戶端應參照ApplicationReport#getFinalApplicationStatus
以查看應用程式任務本身的實際成功/失敗。在失敗的情況下,ApplicationReport#getDiagnostics
可能有助於更深入了解失敗原因。
- 如果 ApplicationMaster 支援,客戶端可以直接透過從應用程式報告取得的 host:rpcport 資訊,向 AM 本身查詢進度更新。如果可用,它也可以使用從報告取得的追蹤網址。
YarnClient
支援 killApplication
呼叫,讓客戶端可以透過 ResourceManager 向 AM 傳送終止訊號。ApplicationMaster 如果有此設計,也可能透過其 rpc 層支援中止呼叫,讓客戶端可以利用。
yarnClient.killApplication(appId);
AM 是工作的實際擁有者。它將由 RM 啟動,並透過客戶端提供所有必要的資訊和資源,關於它受託監督和完成的工作。
由於 AM 在容器中啟動,而該容器可能會(很可能會)與其他容器共用一個實體主機,因此在多租戶性質中,除了其他問題之外,它不能對可以監聽的預先設定埠等事項做出任何假設。
當 AM 啟動時,會透過環境提供幾個參數給它。這些參數包括 AM 容器的 ContainerId
、應用程式提交時間,以及執行 ApplicationMaster 的 NM(節點管理員)主機的詳細資料。參照 ApplicationConstants
以取得參數名稱。
與 RM 的所有互動都需要 ApplicationAttemptId
(如果發生失敗,每個應用程式可能有多次嘗試)。ApplicationAttemptId
可以從 AM 的容器 ID 取得。有輔助 API 可以將從環境取得的值轉換為物件。
Map<String, String> envs = System.getenv(); String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); if (containerIdString == null) { // container id should always be set in the env by the framework throw new IllegalArgumentException( "ContainerId not set in the environment"); } ContainerId containerId = ConverterUtils.toContainerId(containerIdString); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start();
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS
存取,預設值由 YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
定義。ApplicationMaster 需要向 ResourceManager 註冊自己,才能開始發出心跳。// Register self with ResourceManager // This will start heartbeating to the RM appMasterHostname = NetUtils.getHostname(); RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the // resource manager int maxMem = response.getMaximumResourceCapability().getMemory(); LOG.info("Max mem capability of resources in this cluster " + maxMem); int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max vcores capability of resources in this cluster " + maxVCores); // A resource ask cannot exceed the max. if (containerMemory > maxMem) { LOG.info("Container memory specified above max threshold of cluster." + " Using max value." + ", specified=" + containerMemory + ", max=" + maxMem); containerMemory = maxMem; } if (containerVirtualCores > maxVCores) { LOG.info("Container virtual cores specified above max threshold of cluster." + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + maxVCores); containerVirtualCores = maxVCores; } List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration.");
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration."); int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size(); // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); }
setupContainerAskForRM()
中,以下兩件事需要一些設定
資源功能:目前,YARN 支援基於記憶體的資源需求,因此要求應定義需要多少記憶體。值以 MB 定義,且必須小於叢集的最大功能,並且是最小功能的精確倍數。記憶體資源對應於施加在工作容器上的實體記憶體限制。它也將支援基於運算的資源 (vCore),如程式碼所示。
優先順序:在要求一組容器時,AM 可以為每組定義不同的優先順序。例如,Map-Reduce AM 可能為 Map 工作所需的容器分配較高的優先順序,而為 Reduce 工作的容器分配較低的優先順序。
private ContainerRequest setupContainerAskForRM() { // setup requirements for hosts // using * as any host will do for the distributed shell app // set the priority for the request Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); LOG.info("Requested container ask: " + request.toString()); return request; }
AMRMClientAsync
客戶端的事件處理常式非同步啟動。處理常式應實作 AMRMClientAsync.CallbackHandler
介面。
- 當有容器配置時,處理常式會設定執行啟動容器程式碼的執行緒。在此,我們使用名稱
LaunchContainerRunnable
來示範。我們將在本文的以下部分討論LaunchContainerRunnable
類別。
@Override public void onContainersAllocated(List<Container> allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer, containerListener); Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); launchThread.start(); } }
- 在心跳上,事件處理常式會報告應用程式的進度。
@Override public float getProgress() { // set progress to deliver to RM on next heartbeat float progress = (float) numCompletedContainers.get() / numTotalContainers; return progress; }
ContainerLaunchContext
時所遵循的類似程序。一旦定義了 ContainerLaunchContext
,AM 就可以透過 NMClientAsync
啟動它。// Set the necessary command to execute on the allocated container Vector<CharSequence> vargs = new Vector<CharSequence>(5); // Set executable command vargs.add(shellCommand); // Set shell script path if (!scriptPath.isEmpty()) { vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath); } // Set args for the shell command if any vargs.add(shellArgs); // Add log redirect params vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); // Get final command StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } List<String> commands = new ArrayList<String>(); commands.add(command.toString()); // Set up ContainerLaunchContext, setting local resource, environment, // command and token for constructor. // Note for tokens: Set up tokens for the container too. Today, for normal // shell commands, the container in distribute-shell doesn't need any // tokens. We are populating them mainly for NodeManagers to be able to // download anyfiles in the distributed file-system. The tokens are // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx);
NMClientAsync
物件連同其事件處理常式會處理容器事件。包括容器啟動、停止、狀態更新,以及發生錯誤。
在 ApplicationMaster 確定工作已完成後,它需要透過 AM-RM 資訊客戶端取消自己的註冊,然後停止資訊客戶端。
try { amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); } catch (YarnException ex) { LOG.error("Failed to unregister application", ex); } catch (IOException e) { LOG.error("Failed to unregister application", e); } amRMClient.stop();
您可以使用 LocalResource 將資源新增到您的應用程式要求中。這將導致 YARN 將資源分發到 ApplicationMaster 節點。如果資源是 tgz、zip 或 jar,您可以讓 YARN 解壓縮它。然後,您需要做的就是將解壓縮的資料夾新增到您的 classpath 中。例如,在建立您的應用程式要求時
File packageFile = new File(packagePath); URL packageUrl = ConverterUtils.getYarnUrlFromPath( FileContext.getFileContext().makeQualified(new Path(packagePath))); packageResource.setResource(packageUrl); packageResource.setSize(packageFile.length()); packageResource.setTimestamp(packageFile.lastModified()); packageResource.setType(LocalResourceType.ARCHIVE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); resource.setMemory(memory); containerCtx.setResource(resource); containerCtx.setCommands(ImmutableList.of( "java -cp './package/*' some.class.to.Run " + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")); containerCtx.setLocalResources( Collections.singletonMap("package", packageResource)); appCtx.setApplicationId(appId); appCtx.setUser(user.getShortUserName); appCtx.setAMContainerSpec(containerCtx); yarnClient.submitApplication(appCtx);
如您所見,setLocalResources
命令會取得名稱對應資源的映射。名稱會成為應用程式 cwd 中的符號連結,因此您只要使用 ./package/* 就能參考裡面的成品。
注意:Java 的 classpath (cp) 參數非常敏感。請確保語法完全正確。
一旦您的套件分發到您的 AM,您需要在您的 AM 啟動新的容器時遵循相同的程序(假設您希望資源傳送到您的容器)。這段程式碼是相同的。您只需要確保您提供套件路徑(HDFS 或本機)給您的 AM,以便它可以將資源 URL 與容器 ctx 一起傳送。
ApplicationAttemptId
?ApplicationAttemptId
會透過環境傳遞給 AM,而來自環境的值可以透過 ConverterUtils 輔助函式轉換成 ApplicationAttemptId
物件。
這可能是因為高記憶體使用量超過您要求的容器記憶體大小。這有許多原因可能導致此問題。首先,查看 NodeManager 在殺死您的容器時傾出的程序樹。您感興趣的兩件事是實體記憶體和虛擬記憶體。如果您超過實體記憶體限制,表示您的應用程式使用過多實體記憶體。如果您執行 Java 應用程式,您可以使用 -hprof 來查看堆積中佔用空間的內容。如果您超過虛擬記憶體,您可能需要增加群集範圍設定變數 yarn.nodemanager.vmem-pmem-ratio
的值。
在啟動容器時在命令列上設定 -Djava.library.path
可能導致 Hadoop 使用的原生函式庫無法正確載入,並可能導致錯誤。建議改用 LD_LIBRARY_PATH
。
YARN 分散式 shell:在設定好開發環境後,在 hadoop-yarn-applications-distributedshell
專案中。