summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoravijayanhwx <avijayan@hortonworks.com>2018-05-31 18:43:44 -0700
committerGitHub <noreply@github.com>2018-05-31 18:43:44 -0700
commit607fe42f121e9453de3938fcff3f850c89866e0a (patch)
tree90d6371dea9c11350b123aac7c09dd03b48662aa
parentf20beb60ebed026e0c9e7764fedcd90aeb43bcab (diff)
AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. (#1405)
* AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. (Unit tests) * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
-rw-r--r--ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java252
-rw-r--r--ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java8
-rw-r--r--ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java19
-rw-r--r--ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java5
-rw-r--r--ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml8
-rw-r--r--ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml10
-rw-r--r--ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py6
-rw-r--r--ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py1
-rw-r--r--ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java283
9 files changed, 587 insertions, 5 deletions
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java
new file mode 100644
index 0000000000..8b0f631118
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.checks;
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.internal.RequestResourceProvider;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.apache.ambari.server.state.stack.UpgradePack;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * This pre upgrade check verifies if the Ambari Metrics Hadoop Sink package version on all hosts is the expected one
+ * corresponding to the stack version. For example, in HDP 3.0, the corresponding ambari-metrics-hadoop-sink version should
+ * be 2.7.0.0.
+ */
+@Singleton
+@UpgradeCheck(
+ group = UpgradeCheckGroup.REPOSITORY_VERSION)
+public class AmbariMetricsHadoopSinkVersionCompatibilityCheck extends AbstractCheckDescriptor {
+
+ @Inject
+ private RequestDAO requestDAO;
+
+ @Inject
+ private HostRoleCommandDAO hostRoleCommandDAO;
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmbariMetricsHadoopSinkVersionCompatibilityCheck.class);
+
+ private enum PreUpgradeCheckStatus {SUCCESS, FAILED, RUNNING}
+
+ static final String HADOOP_SINK_VERSION_NOT_SPECIFIED = "hadoop-sink-version-not-specified";
+
+ static final String MIN_HADOOP_SINK_VERSION_PROPERTY_NAME = "min-hadoop-sink-version";
+ static final String RETRY_INTERVAL_PROPERTY_NAME = "request-status-check-retry-interval";
+ static final String NUM_TRIES_PROPERTY_NAME = "request-status-check-num-retries";
+
+ /**
+ * Total wait time for Ambari Server request time to finish => 2 mins.
+ */
+ private long retryInterval = 6000l; // 6 seconds sleep interval per retry.
+ private int numTries = 20; // 20 times the check will try to see if request finished.
+
+ /**
+ * Constructor.
+ */
+ public AmbariMetricsHadoopSinkVersionCompatibilityCheck() {
+ super(CheckDescription.AMS_HADOOP_SINK_VERSION_COMPATIBILITY);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<String> getApplicableServices() {
+ return Sets.newHashSet("AMBARI_METRICS", "HDFS");
+ }
+
+ @Override
+ public void perform(PrerequisiteCheck prerequisiteCheck, PrereqCheckRequest prereqCheckRequest) throws AmbariException {
+
+ String minHadoopSinkVersion = null;
+
+ UpgradePack.PrerequisiteCheckConfig prerequisiteCheckConfig = prereqCheckRequest.getPrerequisiteCheckConfig();
+ Map<String, String> checkProperties = null;
+ if(prerequisiteCheckConfig != null) {
+ checkProperties = prerequisiteCheckConfig.getCheckProperties(this.getClass().getName());
+ }
+
+ if(checkProperties != null) {
+ minHadoopSinkVersion = checkProperties.get(MIN_HADOOP_SINK_VERSION_PROPERTY_NAME);
+ retryInterval = Long.valueOf(checkProperties.getOrDefault(RETRY_INTERVAL_PROPERTY_NAME, "6000"));
+ numTries = Integer.valueOf(checkProperties.getOrDefault(NUM_TRIES_PROPERTY_NAME, "20"));
+ }
+
+ if (StringUtils.isEmpty(minHadoopSinkVersion)) {
+ LOG.debug("Hadoop Sink version for pre-check not specified.");
+ prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+ prerequisiteCheck.setFailReason(getFailReason(HADOOP_SINK_VERSION_NOT_SPECIFIED, prerequisiteCheck, prereqCheckRequest));
+ return;
+ }
+
+ LOG.debug("Properties : Hadoop Sink Version = {} , retryInterval = {}, numTries = {}", minHadoopSinkVersion, retryInterval, numTries);
+
+ AmbariManagementController ambariManagementController = AmbariServer.getController();
+
+ ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+ Resource.Type.Request,
+ ambariManagementController
+ );
+
+ String clusterName = prereqCheckRequest.getClusterName();
+
+ Set<String> hosts = ambariManagementController.getClusters()
+ .getCluster(clusterName).getHosts("AMBARI_METRICS", "METRICS_MONITOR");
+
+ if (CollectionUtils.isEmpty(hosts)) {
+ LOG.warn("No hosts have the component METRICS_MONITOR.");
+ prerequisiteCheck.setStatus(PrereqCheckStatus.PASS);
+ return;
+ }
+
+ Set<Map<String, Object>> propertiesSet = new HashSet<>();
+ Map<String, Object> properties = new LinkedHashMap<>();
+ properties.put(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID, clusterName);
+
+ Set<Map<String, Object>> filterSet = new HashSet<>();
+ Map<String, Object> filterMap = new HashMap<>();
+ filterMap.put(RequestResourceProvider.SERVICE_ID, "AMBARI_METRICS");
+ filterMap.put(RequestResourceProvider.COMPONENT_ID, "METRICS_MONITOR");
+ filterMap.put(RequestResourceProvider.HOSTS_ID, StringUtils.join(hosts,","));
+ filterSet.add(filterMap);
+
+ properties.put(RequestResourceProvider.REQUEST_RESOURCE_FILTER_ID, filterSet);
+ propertiesSet.add(properties);
+
+ Map<String, String> requestInfoProperties = new HashMap<>();
+ requestInfoProperties.put(RequestResourceProvider.COMMAND_ID, "CHECK_HADOOP_SINK_VERSION");
+ requestInfoProperties.put(RequestResourceProvider.REQUEST_CONTEXT_ID, "Pre Upgrade check for compatible Hadoop Metric " +
+ "Sink version on all hosts.");
+
+ Request request = PropertyHelper.getCreateRequest(propertiesSet, requestInfoProperties);
+ try {
+ org.apache.ambari.server.controller.spi.RequestStatus response = provider.createResources(request);
+ Resource responseResource = response.getRequestResource();
+ String requestIdProp = PropertyHelper.getPropertyId("Requests", "id");
+ long requestId = (long) responseResource.getPropertyValue(requestIdProp);
+ LOG.debug("RequestId for AMS Hadoop Sink version compatibility pre check : " + requestId);
+
+ Thread.sleep(retryInterval);
+ PreUpgradeCheckStatus status;
+ int retry = 0;
+ LinkedHashSet<String> failedHosts = new LinkedHashSet<>();
+ while ((status = pollRequestStatus(requestId, failedHosts)).equals(PreUpgradeCheckStatus.RUNNING)
+ && retry++ < numTries) {
+ if (retry != numTries) {
+ Thread.sleep(retryInterval);
+ }
+ }
+
+ if (status.equals(PreUpgradeCheckStatus.SUCCESS)) {
+ prerequisiteCheck.setStatus(PrereqCheckStatus.PASS);
+ } else {
+ prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+ prerequisiteCheck.setFailReason(String.format(getFailReason(prerequisiteCheck, prereqCheckRequest), minHadoopSinkVersion));
+ prerequisiteCheck.setFailedOn(failedHosts);
+ }
+ } catch (Exception e) {
+ LOG.error("Error running Pre Upgrade check for AMS Hadoop Sink compatibility. " + e);
+ prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+ }
+ }
+
+ /**
+ * Get the status of the requestId and also the set of failed hosts if any.
+ * @param requestId RequestId to track.
+ * @param failedHosts populate this argument for failed hosts.
+ * @return Status of the request.
+ * @throws Exception
+ */
+ private PreUpgradeCheckStatus pollRequestStatus(long requestId, Set<String> failedHosts) throws Exception {
+
+ List<RequestEntity> requestEntities = requestDAO.findByPks(Collections.singleton(requestId), true);
+ if (requestEntities != null && requestEntities.size() > 0) {
+
+ RequestEntity requestEntity = requestEntities.iterator().next();
+ HostRoleStatus requestStatus = requestEntity.getStatus();
+
+ if (HostRoleStatus.COMPLETED.equals(requestStatus)) {
+ return PreUpgradeCheckStatus.SUCCESS;
+ }
+
+ else if (requestStatus.isFailedState()) {
+ failedHosts.addAll(getPreUpgradeCheckFailedHosts(requestEntity));
+ LOG.debug("Hadoop Sink version check failed on the following hosts : " + failedHosts.stream().collect(Collectors.joining(",")));
+ return PreUpgradeCheckStatus.FAILED;
+ } else {
+ return PreUpgradeCheckStatus.RUNNING;
+ }
+ } else {
+ LOG.error("Unable to find RequestEntity for created request.");
+ }
+ return PreUpgradeCheckStatus.FAILED;
+ }
+
+ /**
+ * Get the set of hosts (tasks) that failed the check.
+ * @param requestEntity request info to get the task level info.
+ * @return Set of hosts which failed the check along with type of failed state.
+ * @throws Exception
+ */
+ private Set<String> getPreUpgradeCheckFailedHosts(RequestEntity requestEntity) throws Exception {
+
+ List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(requestEntity.getRequestId(), true);
+
+ Set<String> failedHosts = new LinkedHashSet<>();
+ for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandEntities) {
+ HostRoleStatus status = hostRoleCommandEntity.getStatus();
+ if (status.isFailedState()) {
+ failedHosts.add(hostRoleCommandEntity.getHostName() + "(" + status + ")");
+ }
+ }
+ return failedHosts;
+ }
+} \ No newline at end of file
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
index 76b8e236a9..f19b957e74 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
@@ -219,6 +219,14 @@ public class CheckDescription {
.put(HiveDynamicServiceDiscoveryCheck.HIVE_DYNAMIC_SERVICE_ZK_NAMESPACE_KEY,
"The hive-site.xml property hive.server2.zookeeper.namespace should be set to the value for the root namespace on ZooKeeper.").build());
+ public static CheckDescription AMS_HADOOP_SINK_VERSION_COMPATIBILITY = new CheckDescription("AMS_HADOOP_SINK_VERSION_COMPATIBILITY",
+ PrereqCheckType.HOST,
+ "Ambari Metrics Hadoop Sinks need to be compatible with the stack version. This check ensures that compatibility.",
+ new ImmutableMap.Builder<String, String>().put(AbstractCheckDescriptor.DEFAULT,"Hadoop Sink version check failed. " +
+ "To fix this, please upgrade 'ambari-metrics-hadoop-sink' package to %s on all the failed hosts")
+ .put(AmbariMetricsHadoopSinkVersionCompatibilityCheck.HADOOP_SINK_VERSION_NOT_SPECIFIED, "Hadoop Sink version for pre-check not specified. " +
+ "Please use 'min-hadoop-sink-version' property in upgrade pack to specify min hadoop sink version").build());
+
public static CheckDescription CONFIG_MERGE = new CheckDescription("CONFIG_MERGE",
PrereqCheckType.CLUSTER,
"Configuration Merge Check",
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 010ccec4ea..b7352572b1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -62,6 +62,8 @@ import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity_;
import org.apache.ambari.server.orm.entities.StageEntity;
+import org.eclipse.persistence.config.HintValues;
+import org.eclipse.persistence.config.QueryHints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -492,10 +494,19 @@ public class HostRoleCommandDAO {
@RequiresSession
public List<HostRoleCommandEntity> findByRequest(long requestId) {
- TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
- "FROM HostRoleCommandEntity command " +
- "WHERE command.requestId=?1 ORDER BY command.taskId", HostRoleCommandEntity.class);
- return daoUtils.selectList(query, requestId);
+ return findByRequest(requestId, false);
+ }
+
+ @RequiresSession
+ public List<HostRoleCommandEntity> findByRequest(long requestId, boolean refreshHint) {
+ TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
+ "HostRoleCommandEntity.findByRequestId",
+ HostRoleCommandEntity.class);
+ if (refreshHint) {
+ query.setHint(QueryHints.REFRESH, HintValues.TRUE);
+ }
+ query.setParameter("requestId", requestId);
+ return daoUtils.selectList(query);
}
@RequiresSession
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 0eea7e67e1..0b99667db5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -107,7 +107,10 @@ import org.apache.commons.lang.ArrayUtils;
query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
@NamedQuery(
name = "HostRoleCommandEntity.findLatestServiceChecksByRole",
- query = "SELECT NEW org.apache.ambari.server.orm.dao.HostRoleCommandDAO.LastServiceCheckDTO(command.role, MAX(command.endTime)) FROM HostRoleCommandEntity command WHERE command.roleCommand = :roleCommand AND command.endTime > 0 AND command.stage.clusterId = :clusterId GROUP BY command.role ORDER BY command.role ASC")
+ query = "SELECT NEW org.apache.ambari.server.orm.dao.HostRoleCommandDAO.LastServiceCheckDTO(command.role, MAX(command.endTime)) FROM HostRoleCommandEntity command WHERE command.roleCommand = :roleCommand AND command.endTime > 0 AND command.stage.clusterId = :clusterId GROUP BY command.role ORDER BY command.role ASC"),
+ @NamedQuery(
+ name = "HostRoleCommandEntity.findByRequestId",
+ query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId ORDER BY command.taskId")
})
public class HostRoleCommandEntity {
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index ec30ea50a0..1411ba1c8c 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -144,6 +144,14 @@
<on-ambari-upgrade add="true"/>
</property>
<property>
+ <name>min_ambari_metrics_hadoop_sink_version</name>
+ <value>2.7.0.0</value>
+ <description>
+ Minimum version of ambari metrics hadoop sink that is compatible with this version of AMS.
+ </description>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
<name>content</name>
<display-name>ams-env template</display-name>
<value>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 20aaab36bf..5a6cd45819 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -87,6 +87,16 @@
<scriptType>PYTHON</scriptType>
<timeout>1200</timeout>
</commandScript>
+   <customCommands>
+     <customCommand>
+       <name>CHECK_HADOOP_SINK_VERSION</name>
+       <commandScript>
+         <script>scripts/metrics_monitor.py</script>
+         <scriptType>PYTHON</scriptType>
+         <timeout>600</timeout>
+       </commandScript>
+     </customCommand>
+   </customCommands>
<logs>
<log>
<logId>ams_monitor</logId>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
index 16c7997254..03176ab55e 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
@@ -22,6 +22,7 @@ from resource_management.libraries.script.script import Script
from ams import ams
from ams_service import ams_service
from status import check_service_status
+from ambari_commons.repo_manager.repo_manager_helper import check_installed_metrics_hadoop_sink_version
class AmsMonitor(Script):
def install(self, env):
@@ -67,6 +68,11 @@ class AmsMonitor(Script):
import params
return params.ams_user
+ def check_hadoop_sink_version(self, env):
+ import params
+ check_installed_metrics_hadoop_sink_version(checked_version=params.min_hadoop_sink_version,
+ less_valid=False,
+ equal_valid=True)
if __name__ == "__main__":
AmsMonitor().execute()
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 08fa67551d..458f45a7ff 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -394,6 +394,7 @@ hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_nam
clusterHostInfoDict = config["clusterHostInfo"]
+min_hadoop_sink_version = default("/configurations/ams-env/min_ambari_metrics_hadoop_sink_version", "2.7.0.0")
hdfs_site = config['configurations']['hdfs-site']
default_fs = config['configurations']['core-site']['fs.defaultFS']
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java
new file mode 100644
index 0000000000..7209fc974a
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.checks;
+
+import static org.apache.ambari.server.checks.AmbariMetricsHadoopSinkVersionCompatibilityCheck.MIN_HADOOP_SINK_VERSION_PROPERTY_NAME;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.RepositoryType;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.repository.ClusterVersionSummary;
+import org.apache.ambari.server.state.repository.VersionDefinitionXml;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.apache.ambari.server.state.stack.UpgradePack;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.google.inject.Provider;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest ({AmbariServer.class, AbstractControllerResourceProvider.class, PropertyHelper.class})
+public class AmbariMetricsHadoopSinkVersionCheckTest {
+ private final Clusters m_clusters = Mockito.mock(Clusters.class);
+ private final AmbariMetricsHadoopSinkVersionCompatibilityCheck m_check = new AmbariMetricsHadoopSinkVersionCompatibilityCheck();
+ private final RepositoryVersionDAO repositoryVersionDAO = Mockito.mock(
+ RepositoryVersionDAO.class);
+
+ private ClusterVersionSummary m_clusterVersionSummary;
+
+ private VersionDefinitionXml m_vdfXml;
+
+ private RepositoryVersionEntity m_repositoryVersion;
+
+ final Map<String, Service> m_services = new HashMap<>();
+
+ /**
+ *
+ */
+ @Before
+ public void setup() throws Exception {
+
+ m_repositoryVersion = Mockito.mock(RepositoryVersionEntity.class);
+ m_clusterVersionSummary = Mockito.mock(ClusterVersionSummary.class);
+ m_vdfXml = Mockito.mock(VersionDefinitionXml.class);
+ MockitoAnnotations.initMocks(this);
+
+ m_check.clustersProvider = new Provider<Clusters>() {
+
+ @Override
+ public Clusters get() {
+ return m_clusters;
+ }
+ };
+ Configuration config = Mockito.mock(Configuration.class);
+ m_check.config = config;
+
+ when(m_repositoryVersion.getVersion()).thenReturn("3.0.0.0-1234");
+ when(m_repositoryVersion.getStackId()).thenReturn(new StackId("HDP", "3.0"));
+
+ m_services.clear();
+
+ when(m_repositoryVersion.getType()).thenReturn(RepositoryType.STANDARD);
+ when(m_repositoryVersion.getRepositoryXml()).thenReturn(m_vdfXml);
+ when(m_vdfXml.getClusterSummary(Mockito.any(Cluster.class))).thenReturn(m_clusterVersionSummary);
+ when(m_clusterVersionSummary.getAvailableServiceNames()).thenReturn(m_services.keySet());
+
+ }
+
+ /**
+ * Tests that the check is applicable when hive is installed.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIsApplicable() throws Exception {
+ final Cluster cluster = Mockito.mock(Cluster.class);
+
+ when(cluster.getClusterId()).thenReturn(1L);
+ when(m_clusters.getCluster("cluster")).thenReturn(cluster);
+ when(cluster.getServices()).thenReturn(m_services);
+
+ m_services.put("HIVE", Mockito.mock(Service.class));
+
+ PrereqCheckRequest request = new PrereqCheckRequest("cluster");
+ request.setTargetRepositoryVersion(m_repositoryVersion);
+
+ Assert.assertFalse(m_check.isApplicable(request));
+
+ m_services.put("HDFS", Mockito.mock(Service.class));
+
+ m_check.repositoryVersionDaoProvider = new Provider<RepositoryVersionDAO>() {
+ @Override
+ public RepositoryVersionDAO get() {
+ return repositoryVersionDAO;
+ }
+ };
+
+ when(repositoryVersionDAO.findByStackNameAndVersion(Mockito.anyString(),
+ Mockito.anyString())).thenReturn(m_repositoryVersion);
+
+ Assert.assertTrue(m_check.isApplicable(request));
+ }
+
+ /**
+ * Tests that the warning is correctly tripped when there are not enough
+ * metastores.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testPerform() throws Exception {
+
+ AmbariManagementController ambariManagementControllerMock = Mockito.mock(AmbariManagementController.class);
+ PowerMockito.mockStatic(AmbariServer.class);
+ when(AmbariServer.getController()).thenReturn(ambariManagementControllerMock);
+
+ ResourceProvider resourceProviderMock = mock(ResourceProvider.class);
+ PowerMockito.mockStatic(AbstractControllerResourceProvider.class);
+ when(AbstractControllerResourceProvider.getResourceProvider(eq(Resource.Type.Request), any(AmbariManagementController.class))).thenReturn(resourceProviderMock);
+
+ PowerMockito.mockStatic(PropertyHelper.class);
+ Request requestMock = mock(Request.class);
+ when(PropertyHelper.getCreateRequest(any(), any())).thenReturn(requestMock);
+ when(PropertyHelper.getPropertyId("Requests", "id")).thenReturn("requestIdProp");
+
+ RequestStatus requestStatusMock = mock(RequestStatus.class);
+ Resource responseResourceMock = mock(Resource.class);
+ when(resourceProviderMock.createResources(requestMock)).thenReturn(requestStatusMock);
+ when(requestStatusMock.getRequestResource()).thenReturn(responseResourceMock);
+ when(responseResourceMock.getPropertyValue(anyString())).thenReturn(100l);
+
+ Clusters clustersMock = mock(Clusters.class);
+ when(ambariManagementControllerMock.getClusters()).thenReturn(clustersMock);
+ Cluster clusterMock = mock(Cluster.class);
+ when(clustersMock.getCluster("c1")).thenReturn(clusterMock);
+ when(clusterMock.getHosts(eq("AMBARI_METRICS"), eq("METRICS_MONITOR"))).thenReturn(Collections.singleton("h1"));
+
+ RequestDAO requestDAOMock = mock(RequestDAO.class);
+ RequestEntity requestEntityMock = mock(RequestEntity.class);
+ when(requestDAOMock.findByPks(Collections.singleton(100l), true)).thenReturn(Collections.singletonList(requestEntityMock));
+ when(requestEntityMock.getStatus()).thenReturn(HostRoleStatus.IN_PROGRESS).thenReturn(HostRoleStatus.COMPLETED);
+
+ Field requestDaoField = m_check.getClass().getDeclaredField("requestDAO");
+ requestDaoField.setAccessible(true);
+ requestDaoField.set(m_check, requestDAOMock);
+
+ PrerequisiteCheck check = new PrerequisiteCheck(null, "c1");
+ PrereqCheckRequest request = new PrereqCheckRequest("c1");
+ UpgradePack.PrerequisiteCheckConfig prerequisiteCheckConfig = new UpgradePack.PrerequisiteCheckConfig();
+ UpgradePack.PrerequisiteProperty prerequisiteProperty = new UpgradePack.PrerequisiteProperty();
+ prerequisiteProperty.name = MIN_HADOOP_SINK_VERSION_PROPERTY_NAME;
+ prerequisiteProperty.value = "2.7.0.0";
+ UpgradePack.PrerequisiteCheckProperties prerequisiteCheckProperties = new UpgradePack.PrerequisiteCheckProperties();
+ prerequisiteCheckProperties.name = "org.apache.ambari.server.checks.AmbariMetricsHadoopSinkVersionCompatibilityCheck";
+ prerequisiteCheckProperties.properties = Collections.singletonList(prerequisiteProperty);
+ prerequisiteCheckConfig.prerequisiteCheckProperties = Collections.singletonList(prerequisiteCheckProperties);
+ request.setPrerequisiteCheckConfig(prerequisiteCheckConfig);
+ request.setTargetRepositoryVersion(m_repositoryVersion);
+ m_check.perform(check, request);
+
+ Assert.assertEquals(PrereqCheckStatus.PASS, check.getStatus());
+ }
+
+ @Test(timeout = 60000)
+ public void testPerformFail() throws Exception{
+ AmbariManagementController ambariManagementControllerMock = Mockito.mock(AmbariManagementController.class);
+ PowerMockito.mockStatic(AmbariServer.class);
+ when(AmbariServer.getController()).thenReturn(ambariManagementControllerMock);
+
+ ResourceProvider resourceProviderMock = mock(ResourceProvider.class);
+ PowerMockito.mockStatic(AbstractControllerResourceProvider.class);
+ when(AbstractControllerResourceProvider.getResourceProvider(eq(Resource.Type.Request), any(AmbariManagementController.class))).thenReturn(resourceProviderMock);
+
+ PowerMockito.mockStatic(PropertyHelper.class);
+ Request requestMock = mock(Request.class);
+ when(PropertyHelper.getCreateRequest(any(), any())).thenReturn(requestMock);
+ when(PropertyHelper.getPropertyId("Requests", "id")).thenReturn("requestIdProp");
+
+ RequestStatus requestStatusMock = mock(RequestStatus.class);
+ Resource responseResourceMock = mock(Resource.class);
+ when(resourceProviderMock.createResources(requestMock)).thenReturn(requestStatusMock);
+ when(requestStatusMock.getRequestResource()).thenReturn(responseResourceMock);
+ when(responseResourceMock.getPropertyValue(anyString())).thenReturn(101l);
+
+ Clusters clustersMock = mock(Clusters.class);
+ when(ambariManagementControllerMock.getClusters()).thenReturn(clustersMock);
+ Cluster clusterMock = mock(Cluster.class);
+ when(clustersMock.getCluster("c1")).thenReturn(clusterMock);
+ when(clusterMock.getHosts(eq("AMBARI_METRICS"), eq("METRICS_MONITOR"))).thenReturn(Collections.singleton("h1_fail"));
+
+ RequestDAO requestDAOMock = mock(RequestDAO.class);
+ RequestEntity requestEntityMock = mock(RequestEntity.class);
+ when(requestDAOMock.findByPks(Collections.singleton(101l), true)).thenReturn(Collections.singletonList(requestEntityMock));
+ when(requestEntityMock.getStatus()).thenReturn(HostRoleStatus.IN_PROGRESS).thenReturn(HostRoleStatus.FAILED);
+
+ Field requestDaoField = m_check.getClass().getDeclaredField("requestDAO");
+ requestDaoField.setAccessible(true);
+ requestDaoField.set(m_check, requestDAOMock);
+
+
+ when(requestEntityMock.getRequestId()).thenReturn(101l);
+ HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+ HostRoleCommandEntity hrcEntityMock = mock(HostRoleCommandEntity.class);
+ when(hostRoleCommandDAOMock.findByRequest(101l, true)).thenReturn(Collections.singletonList(hrcEntityMock));
+ when(hrcEntityMock.getStatus()).thenReturn(HostRoleStatus.FAILED);
+ when(hrcEntityMock.getHostName()).thenReturn("h1_fail");
+
+ Field hrcDaoField = m_check.getClass().getDeclaredField("hostRoleCommandDAO");
+ hrcDaoField.setAccessible(true);
+ hrcDaoField.set(m_check, hostRoleCommandDAOMock);
+
+ PrerequisiteCheck check = new PrerequisiteCheck(null, "c1");
+ PrereqCheckRequest request = new PrereqCheckRequest("c1");
+ UpgradePack.PrerequisiteCheckConfig prerequisiteCheckConfig = new UpgradePack.PrerequisiteCheckConfig();
+ UpgradePack.PrerequisiteProperty prerequisiteProperty = new UpgradePack.PrerequisiteProperty();
+ prerequisiteProperty.name = MIN_HADOOP_SINK_VERSION_PROPERTY_NAME;
+ prerequisiteProperty.value = "2.7.0.0";
+ UpgradePack.PrerequisiteCheckProperties prerequisiteCheckProperties = new UpgradePack.PrerequisiteCheckProperties();
+ prerequisiteCheckProperties.name = "org.apache.ambari.server.checks.AmbariMetricsHadoopSinkVersionCompatibilityCheck";
+ prerequisiteCheckProperties.properties = Collections.singletonList(prerequisiteProperty);
+ prerequisiteCheckConfig.prerequisiteCheckProperties = Collections.singletonList(prerequisiteCheckProperties);
+ request.setPrerequisiteCheckConfig(prerequisiteCheckConfig);
+ request.setTargetRepositoryVersion(m_repositoryVersion);
+ m_check.perform(check, request);
+
+ Assert.assertEquals(PrereqCheckStatus.FAIL, check.getStatus());
+ Assert.assertTrue(check.getFailReason().contains("upgrade 'ambari-metrics-hadoop-sink'"));
+ Assert.assertEquals(check.getFailedOn().size(), 1);
+ Assert.assertTrue(check.getFailedOn().iterator().next().contains("h1_fail"));
+ }
+}