summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMyroslav Papirkovskyi <mpapirkovskyy@apache.org>2018-05-30 21:54:36 +0300
committerGitHub <noreply@github.com>2018-05-30 21:54:36 +0300
commit395714e693eacf4881d8c89d537067c55db99791 (patch)
treebc607541b9fc46d570f3826904fadd1dc0aa1139
parent8d724d7d90c40504ce91b5f09de35e2467e082f9 (diff)
AMBARI-23980. Agent can send too large websocket message with command output. (mpapirkovskyy) (#1413)
-rw-r--r--ambari-agent/conf/unix/ambari-agent.ini2
-rw-r--r--ambari-agent/conf/windows/ambari-agent.ini2
-rw-r--r--ambari-agent/src/main/python/ambari_agent/AmbariConfig.py6
-rw-r--r--ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py58
-rw-r--r--ambari-agent/src/main/python/ambari_agent/Grep.py14
-rw-r--r--ambari-agent/src/main/python/ambari_agent/PythonExecutor.py7
-rw-r--r--ambari-agent/src/test/python/ambari_agent/TestActionQueue.py12
-rw-r--r--ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py78
-rw-r--r--ambari-agent/src/test/python/ambari_agent/TestGrep.py31
-rw-r--r--ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py14
10 files changed, 195 insertions, 29 deletions
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 8596790691..2e403a0099 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -72,7 +72,7 @@ dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie,
/var/run/hadoop,/var/run/zookeeper,/var/run/hbase,/var/run/templeton,/var/run/oozie,
/var/log/hadoop,/var/log/zookeeper,/var/log/hbase,/var/run/templeton,/var/log/hive
; 0 - unlimited
-log_lines_count=300
+log_max_symbols_size=900000
idle_interval_min=1
idle_interval_max=10
diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini
index 42d65c14d9..189daff7ec 100644
--- a/ambari-agent/conf/windows/ambari-agent.ini
+++ b/ambari-agent/conf/windows/ambari-agent.ini
@@ -52,6 +52,6 @@ dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie,
/var/log/nagios
rpms=hadoop,hadoop-lzo,hbase,oozie,sqoop,pig,zookeeper,hive,libconfuse,ambari-log4j
; 0 - unlimited
-log_lines_count=300
+log_max_symbols_size=900000
idle_interval_min=1
idle_interval_max=10
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 88aa8ea211..68cb593bb0 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -69,7 +69,7 @@ passphrase_env_var_name=AMBARI_PASSPHRASE
[heartbeat]
state_interval = 1
dirs={ps}etc{ps}hadoop,{ps}etc{ps}hadoop{ps}conf,{ps}var{ps}run{ps}hadoop,{ps}var{ps}log{ps}hadoop
-log_lines_count=300
+log_max_symbols_size=900000
iddle_interval_min=1
iddle_interval_max=10
@@ -180,6 +180,10 @@ class AmbariConfig:
return int(self.get('heartbeat', 'state_interval_seconds', '60'))
@property
+ def log_max_symbols_size(self):
+ return int(self.get('heartbeat', 'log_max_symbols_size', '900000'))
+
+ @property
def cache_dir(self):
return self.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index a30063e4d9..84af058c8f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -22,6 +22,9 @@ import os
import logging
import threading
import copy
+
+import ambari_simplejson as json
+
from collections import defaultdict
from Grep import Grep
@@ -38,6 +41,9 @@ class CommandStatusDict():
task_id -> (command, cmd_report)
"""
+ # 2MB is a max message size on the server side
+ MAX_REPORT_SIZE = 1950000
+
def __init__(self, initializer_module):
"""
callback_action is called every time when status of some command is
@@ -48,6 +54,7 @@ class CommandStatusDict():
self.initializer_module = initializer_module
self.command_update_output = initializer_module.config.command_update_output
self.server_responses_listener = initializer_module.server_responses_listener
+ self.log_max_symbols_size = initializer_module.config.log_max_symbols_size
self.reported_reports = set()
@@ -91,10 +98,31 @@ class CommandStatusDict():
report = self.generate_report()
if report:
- success, correlation_id = self.force_update_to_server(report)
-
- if success:
- self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.clear_reported_reports()
+ for splitted_report in self.split_reports(report, CommandStatusDict.MAX_REPORT_SIZE):
+ success, correlation_id = self.force_update_to_server(splitted_report)
+
+ if success:
+ self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.clear_reported_reports(splitted_report)
+
+ def split_reports(self, result_reports, size):
+ part = defaultdict(lambda:[])
+ prev_part = defaultdict(lambda:[])
+ for cluster_id, cluster_reports in result_reports.items():
+ for report in cluster_reports:
+ prev_part[cluster_id].append(report)
+ if self.size_approved(prev_part, size):
+ part[cluster_id].append(report)
+ else:
+ yield part
+ part = defaultdict(lambda:[])
+ prev_part = defaultdict(lambda:[])
+ prev_part[cluster_id].append(report)
+ part[cluster_id].append(report)
+ yield part
+
+ def size_approved(self, report, size):
+ report_json = json.dumps(report)
+ return len(report_json) <= size
def get_command_status(self, taskId):
with self.lock:
@@ -128,11 +156,22 @@ class CommandStatusDict():
pass
return resultReports
- def clear_reported_reports(self):
+ def clear_reported_reports(self, result_reports):
with self.lock:
+ keys_to_remove = set()
for key in self.reported_reports:
- del self.current_state[key]
- self.reported_reports = set()
+ if self.has_report_with_taskid(key, result_reports):
+ del self.current_state[key]
+ keys_to_remove.add(key)
+
+ self.reported_reports = self.reported_reports.difference(keys_to_remove)
+
+ def has_report_with_taskid(self, task_id, result_reports):
+ for cluster_reports in result_reports.values():
+ for report in cluster_reports:
+ if report['taskId'] == task_id:
+ return True
+ return False
def generate_in_progress_report(self, command, report):
"""
@@ -153,11 +192,12 @@ class CommandStatusDict():
tmpout, tmperr, tmpstructuredout = files_content
grep = Grep()
- output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
+ output = grep.tail_by_symbols(grep.tail(tmpout, Grep.OUTPUT_LAST_LINES), self.log_max_symbols_size)
+ err = grep.tail_by_symbols(grep.tail(tmperr, Grep.OUTPUT_LAST_LINES), self.log_max_symbols_size)
inprogress = self.generate_report_template(command)
inprogress.update({
'stdout': output,
- 'stderr': tmperr,
+ 'stderr': err,
'structuredOut': tmpstructuredout,
'exitCode': 777,
'status': ActionQueue.IN_PROGRESS_STATUS,
diff --git a/ambari-agent/src/main/python/ambari_agent/Grep.py b/ambari-agent/src/main/python/ambari_agent/Grep.py
index 1aaf40d75c..70bc4b9e0b 100644
--- a/ambari-agent/src/main/python/ambari_agent/Grep.py
+++ b/ambari-agent/src/main/python/ambari_agent/Grep.py
@@ -74,3 +74,17 @@ class Grep:
length = len(lines)
tailed = lines[length - n:]
return "".join(tailed)
+
+ def tail_by_symbols(self, string, n):
+ """
+ Copies last n symbols with trimming by rows from string to result. Also, string trim is performed.
+ """
+ stripped_string = string.strip()
+ lines = stripped_string.splitlines(True)
+ tailed = []
+ for line in reversed(lines):
+ if len("".join(tailed) + line) <= n:
+ tailed[:0] = line
+ else:
+ break
+ return "".join(tailed)
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 771578f832..8b9e9ca26b 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -51,6 +51,7 @@ class PythonExecutor(object):
self.python_process_has_been_killed = False
self.tmpDir = tmpDir
self.config = config
+ self.log_max_symbols_size = self.config.log_max_symbols_size
pass
@@ -189,12 +190,10 @@ class PythonExecutor(object):
return python_command
def condenseOutput(self, stdout, stderr, retcode, structured_out):
- log_lines_count = self.config.get('heartbeat', 'log_lines_count')
-
result = {
"exitcode": retcode,
- "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
- "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
+ "stdout": self.grep.tail_by_symbols(stdout, self.log_max_symbols_size) if self.log_max_symbols_size else stdout,
+ "stderr": self.grep.tail_by_symbols(stderr, self.log_max_symbols_size) if self.log_max_symbols_size else stderr,
"structuredOut" : structured_out
}
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index c6b78cec30..af41fe8cfe 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -579,7 +579,7 @@ class TestActionQueue(TestCase):
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
-
+ config.set('heartbeat', 'log_symbols_count', "900000")
initializer_module = InitializerModule()
initializer_module.init()
initializer_module.config = config
@@ -661,7 +661,7 @@ class TestActionQueue(TestCase):
self.assertEqual(reports[0], expected)
# now should not have reports (read complete/failed reports are deleted)
- actionQueue.commandStatuses.clear_reported_reports()
+ actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports})
reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
self.assertEqual(len(reports), 0)
@@ -680,7 +680,7 @@ class TestActionQueue(TestCase):
reports[0]['status'] == 'IN_PROGRESS':
time.sleep(0.1)
reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
- actionQueue.commandStatuses.clear_reported_reports()
+ actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports})
# check report
expected = {'status': 'FAILED',
@@ -698,7 +698,7 @@ class TestActionQueue(TestCase):
self.assertEqual(reports[0], expected)
# now should not have reports (read complete/failed reports are deleted)
- actionQueue.commandStatuses.clear_reported_reports()
+ actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports})
reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
self.assertEqual(len(reports), 0)
@@ -715,7 +715,7 @@ class TestActionQueue(TestCase):
reports[0]['status'] == 'IN_PROGRESS':
time.sleep(0.1)
reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
- actionQueue.commandStatuses.clear_reported_reports()
+ actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports})
# check report
expected = {'status': 'COMPLETED',
'stderr': 'stderr',
@@ -732,7 +732,7 @@ class TestActionQueue(TestCase):
self.assertEqual(reports[0], expected)
# now should not have reports (read complete/failed reports are deleted)
- actionQueue.commandStatuses.clear_reported_reports()
+ actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports})
reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
self.assertEqual(len(reports), 0)
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py b/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
index 44121dbb05..f2f2eda3bf 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
@@ -185,3 +185,81 @@ class TestCommandStatusDict:#(TestCase):
'actionId': '1-1', 'taskId': 5, 'exitCode': 777}]
}
self.assertEquals(report, expected)
+
+ def test_size_approved(self):
+ # as json: '{"status": "IN_PROGRESS", "structuredOut": "structured_out.tmp", "taskId": 5}', length=77
+ command_in_progress_report = {
+ 'status': 'IN_PROGRESS',
+ 'taskId': 5,
+ 'structuredOut' : 'structured_out.tmp',
+ }
+ mock = MagicMock()
+ command_statuses = CommandStatusDict(mock)
+ self.assertEqual(command_statuses.size_approved(command_in_progress_report, 78), True)
+ self.assertEqual(command_statuses.size_approved(command_in_progress_report, 77), True)
+ self.assertEqual(command_statuses.size_approved(command_in_progress_report, 76), False)
+
+ def test_split_reports(self):
+ # 4 reports for each cluster, general size in json = 295
+ generated_reports = \
+ {'1': [{'status': 'FAILED', 'taskId': 3},
+ {'status': 'FAILED', 'taskId': 4},
+ {'status': 'FAILED', 'taskId': 5},
+ {'status': 'FAILED', 'taskId': 6}],
+ '2': [{'status': 'FAILED', 'taskId': 7},
+ {'status': 'FAILED', 'taskId': 8},
+ {'status': 'FAILED', 'taskId': 9},
+ {'status': 'FAILED', 'taskId': 10}],
+ }
+ mock = MagicMock()
+ command_statuses = CommandStatusDict(mock)
+
+ # all reports will be send at once
+ splitted_reports = []
+ for report in command_statuses.split_reports(generated_reports, 295):
+ splitted_reports.append(report)
+
+ self.assertEqual(len(splitted_reports), 1)
+ self.assertEqual(len(splitted_reports[0]), 2)
+ self.assertEqual(len(splitted_reports[0]['1']), 4)
+ self.assertEqual(len(splitted_reports[0]['2']), 4)
+
+ # all reports will be divided between two parts
+ # {'1': [{3}, {4}, {5}, {6}], '2': [{7}, {8}, {9}]}
+ # {'2': [{10}]}
+ splitted_reports = []
+ for report in command_statuses.split_reports(generated_reports, 294):
+ splitted_reports.append(report)
+
+ self.assertEqual(len(splitted_reports), 2)
+ self.assertEqual(len(splitted_reports[0]), 2)
+ self.assertEqual(len(splitted_reports[0]['1']), 4)
+ self.assertEqual(len(splitted_reports[0]['2']), 3)
+ self.assertEqual(len(splitted_reports[1]), 1)
+ self.assertEqual(len(splitted_reports[1]['2']), 1)
+
+ # all reports will be divided between 8 parts
+ # {'1': [{3}]}
+ #...
+ # {'2': [{10}]}
+ splitted_reports = []
+ for report in command_statuses.split_reports(generated_reports, 73):
+ splitted_reports.append(report)
+
+ self.assertEqual(len(splitted_reports), 8)
+ self.assertEqual(len(splitted_reports[0]), 1)
+ self.assertEqual(len(splitted_reports[0]['1']), 1)
+ self.assertEqual(len(splitted_reports[1]), 1)
+ self.assertEqual(len(splitted_reports[1]['1']), 1)
+ self.assertEqual(len(splitted_reports[2]), 1)
+ self.assertEqual(len(splitted_reports[2]['1']), 1)
+ self.assertEqual(len(splitted_reports[3]), 1)
+ self.assertEqual(len(splitted_reports[3]['1']), 1)
+ self.assertEqual(len(splitted_reports[4]), 1)
+ self.assertEqual(len(splitted_reports[4]['2']), 1)
+ self.assertEqual(len(splitted_reports[5]), 1)
+ self.assertEqual(len(splitted_reports[5]['2']), 1)
+ self.assertEqual(len(splitted_reports[6]), 1)
+ self.assertEqual(len(splitted_reports[6]['2']), 1)
+ self.assertEqual(len(splitted_reports[7]), 1)
+ self.assertEqual(len(splitted_reports[7]['2']), 1)
diff --git a/ambari-agent/src/test/python/ambari_agent/TestGrep.py b/ambari-agent/src/test/python/ambari_agent/TestGrep.py
index 4f681ca523..dbb723ab4d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestGrep.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestGrep.py
@@ -95,6 +95,37 @@ debug: Processing report from ambari-dmi with processor Puppet::Reports::Store
desired = ''
self.assertEquals(fragment, desired, 'Grep tail function contains bug in index arithmetics')
+
+ def test_tail_by_symbols_many_lines(self):
+ desired_size = len(self.string_good.strip())
+ fragment = self.grep.tail_by_symbols(self.string_good, desired_size)
+ desired = self.string_good.strip()
+ self.assertEquals(fragment, desired, "Grep tail function should return all symbols if there are less symbols than n")
+ self.assertEquals(len(fragment), desired_size, "Grep tail function should return all symbols if there are less symbols than n")
+
+ def test_tail_by_symbols_few_lines(self):
+ original = """
+debug: Finishing transaction 70060456663980
+debug: Received report to process from ambari-dmi
+debug: Processing report from ambari-dmi with processor Puppet::Reports::Store
+"""
+ desired = original.replace("\n", os.linesep).strip()
+ desired_size = len(original)
+
+ fragment = self.grep.tail_by_symbols(self.string_good, desired_size)
+ self.assertEquals(fragment, desired, "Grep tail function should return only last 3 lines of file")
+
+ fragment = self.grep.tail_by_symbols(self.string_good, desired_size - 1)
+ self.assertEquals(fragment, desired, "Grep tail function should return only last 2 lines of file")
+
+ fragment = self.grep.tail_by_symbols(self.string_good, desired_size + 1)
+ self.assertEquals(fragment, desired, "Grep tail function should return only last 3 lines of file")
+
+ def test_tail_by_symbols_no_lines(self):
+ fragment = self.grep.tail_by_symbols("", 3)
+ desired = ''
+ self.assertEquals(fragment, desired, 'Grep tail function should return "" for empty string')
+
def tearDown(self):
pass
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index 08cf06eb8f..472bf4c335 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -43,7 +43,7 @@ class TestPythonExecutor(TestCase):
Tests whether watchdog works
"""
subproc_mock = self.subprocess32_mockup()
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
_, tmpstrucout = tempfile.mkstemp()
@@ -76,7 +76,7 @@ class TestPythonExecutor(TestCase):
Tries to catch false positive watchdog invocations
"""
subproc_mock = self.subprocess32_mockup()
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
_, tmpstrucout = tempfile.mkstemp()
@@ -107,7 +107,7 @@ class TestPythonExecutor(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
def test_execution_results(self):
subproc_mock = self.subprocess32_mockup()
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
@@ -137,7 +137,7 @@ class TestPythonExecutor(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
def test_is_successfull(self):
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
executor.python_process_has_been_killed = False
self.assertTrue(executor.isSuccessfull(0))
@@ -150,7 +150,7 @@ class TestPythonExecutor(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
def test_python_command(self):
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
command = executor.python_command("script", ["script_param1"])
self.assertEqual(3, len(command))
self.assertTrue("python" in command[0].lower())
@@ -164,7 +164,7 @@ class TestPythonExecutor(TestCase):
# Test case when previous log file is absent
isfile_mock.return_value = False
log_file = "/var/lib/ambari-agent/data/output-13.txt"
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
executor.back_up_log_file_if_exists(log_file)
self.assertEquals(isfile_mock.called, True)
self.assertEquals(rename_mock.called, False)
@@ -174,7 +174,7 @@ class TestPythonExecutor(TestCase):
# Test case when 3 previous log files are absent
isfile_mock.side_effect = [True, True, True, False]
log_file = "/var/lib/ambari-agent/data/output-13.txt"
- executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
+ executor = PythonExecutor("/tmp", AmbariConfig())
executor.back_up_log_file_if_exists(log_file)
self.assertEquals(isfile_mock.called, True)
self.assertEquals(rename_mock.call_args_list[0][0][0], "/var/lib/ambari-agent/data/output-13.txt")