diff options
author | Myroslav Papirkovskyi <mpapirkovskyy@apache.org> | 2018-05-30 21:54:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-30 21:54:36 +0300 |
commit | 395714e693eacf4881d8c89d537067c55db99791 (patch) | |
tree | bc607541b9fc46d570f3826904fadd1dc0aa1139 | |
parent | 8d724d7d90c40504ce91b5f09de35e2467e082f9 (diff) |
AMBARI-23980. Agent can send too large websocket message with command output. (mpapirkovskyy) (#1413)
10 files changed, 195 insertions, 29 deletions
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 8596790691..2e403a0099 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -72,7 +72,7 @@ dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie, /var/run/hadoop,/var/run/zookeeper,/var/run/hbase,/var/run/templeton,/var/run/oozie, /var/log/hadoop,/var/log/zookeeper,/var/log/hbase,/var/run/templeton,/var/log/hive ; 0 - unlimited -log_lines_count=300 +log_max_symbols_size=900000 idle_interval_min=1 idle_interval_max=10 diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini index 42d65c14d9..189daff7ec 100644 --- a/ambari-agent/conf/windows/ambari-agent.ini +++ b/ambari-agent/conf/windows/ambari-agent.ini @@ -52,6 +52,6 @@ dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie, /var/log/nagios rpms=hadoop,hadoop-lzo,hbase,oozie,sqoop,pig,zookeeper,hive,libconfuse,ambari-log4j ; 0 - unlimited -log_lines_count=300 +log_max_symbols_size=900000 idle_interval_min=1 idle_interval_max=10 diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 88aa8ea211..68cb593bb0 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -69,7 +69,7 @@ passphrase_env_var_name=AMBARI_PASSPHRASE [heartbeat] state_interval = 1 dirs={ps}etc{ps}hadoop,{ps}etc{ps}hadoop{ps}conf,{ps}var{ps}run{ps}hadoop,{ps}var{ps}log{ps}hadoop -log_lines_count=300 +log_max_symbols_size=900000 iddle_interval_min=1 iddle_interval_max=10 @@ -180,6 +180,10 @@ class AmbariConfig: return int(self.get('heartbeat', 'state_interval_seconds', '60')) @property + def log_max_symbols_size(self): + return int(self.get('heartbeat', 'log_max_symbols_size', '900000')) + + @property def cache_dir(self): return self.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index a30063e4d9..84af058c8f 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -22,6 +22,9 @@ import os import logging import threading import copy + +import ambari_simplejson as json + from collections import defaultdict from Grep import Grep @@ -38,6 +41,9 @@ class CommandStatusDict(): task_id -> (command, cmd_report) """ + # 2MB is a max message size on the server side + MAX_REPORT_SIZE = 1950000 + def __init__(self, initializer_module): """ callback_action is called every time when status of some command is @@ -48,6 +54,7 @@ class CommandStatusDict(): self.initializer_module = initializer_module self.command_update_output = initializer_module.config.command_update_output self.server_responses_listener = initializer_module.server_responses_listener + self.log_max_symbols_size = initializer_module.config.log_max_symbols_size self.reported_reports = set() @@ -91,10 +98,31 @@ class CommandStatusDict(): report = self.generate_report() if report: - success, correlation_id = self.force_update_to_server(report) - - if success: - self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.clear_reported_reports() + for splitted_report in self.split_reports(report, CommandStatusDict.MAX_REPORT_SIZE): + success, correlation_id = self.force_update_to_server(splitted_report) + + if success: + self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.clear_reported_reports(splitted_report) + + def split_reports(self, result_reports, size): + part = defaultdict(lambda:[]) + prev_part = defaultdict(lambda:[]) + for cluster_id, cluster_reports in result_reports.items(): + for report in cluster_reports: + prev_part[cluster_id].append(report) + if self.size_approved(prev_part, size): + part[cluster_id].append(report) + else: + yield part + part = defaultdict(lambda:[]) + prev_part = defaultdict(lambda:[]) + prev_part[cluster_id].append(report) + part[cluster_id].append(report) + yield part + + def size_approved(self, report, size): + report_json = json.dumps(report) + return len(report_json) <= size def get_command_status(self, taskId): with self.lock: @@ -128,11 +156,22 @@ class CommandStatusDict(): pass return resultReports - def clear_reported_reports(self): + def clear_reported_reports(self, result_reports): with self.lock: + keys_to_remove = set() for key in self.reported_reports: - del self.current_state[key] - self.reported_reports = set() + if self.has_report_with_taskid(key, result_reports): + del self.current_state[key] + keys_to_remove.add(key) + + self.reported_reports = self.reported_reports.difference(keys_to_remove) + + def has_report_with_taskid(self, task_id, result_reports): + for cluster_reports in result_reports.values(): + for report in cluster_reports: + if report['taskId'] == task_id: + return True + return False def generate_in_progress_report(self, command, report): """ @@ -153,11 +192,12 @@ class CommandStatusDict(): tmpout, tmperr, tmpstructuredout = files_content grep = Grep() - output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES) + output = grep.tail_by_symbols(grep.tail(tmpout, Grep.OUTPUT_LAST_LINES), self.log_max_symbols_size) + err = grep.tail_by_symbols(grep.tail(tmperr, Grep.OUTPUT_LAST_LINES), self.log_max_symbols_size) inprogress = self.generate_report_template(command) inprogress.update({ 'stdout': output, - 'stderr': tmperr, + 'stderr': err, 'structuredOut': tmpstructuredout, 'exitCode': 777, 'status': ActionQueue.IN_PROGRESS_STATUS, diff --git a/ambari-agent/src/main/python/ambari_agent/Grep.py b/ambari-agent/src/main/python/ambari_agent/Grep.py index 1aaf40d75c..70bc4b9e0b 100644 --- a/ambari-agent/src/main/python/ambari_agent/Grep.py +++ b/ambari-agent/src/main/python/ambari_agent/Grep.py @@ -74,3 +74,17 @@ class Grep: length = len(lines) tailed = lines[length - n:] return "".join(tailed) + + def tail_by_symbols(self, string, n): + """ + Copies last n symbols with trimming by rows from string to result. Also, string trim is performed. + """ + stripped_string = string.strip() + lines = stripped_string.splitlines(True) + tailed = [] + for line in reversed(lines): + if len("".join(tailed) + line) <= n: + tailed[:0] = line + else: + break + return "".join(tailed) diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py index 771578f832..8b9e9ca26b 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py @@ -51,6 +51,7 @@ class PythonExecutor(object): self.python_process_has_been_killed = False self.tmpDir = tmpDir self.config = config + self.log_max_symbols_size = self.config.log_max_symbols_size pass @@ -189,12 +190,10 @@ class PythonExecutor(object): return python_command def condenseOutput(self, stdout, stderr, retcode, structured_out): - log_lines_count = self.config.get('heartbeat', 'log_lines_count') - result = { "exitcode": retcode, - "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout, - "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr, + "stdout": self.grep.tail_by_symbols(stdout, self.log_max_symbols_size) if self.log_max_symbols_size else stdout, + "stderr": self.grep.tail_by_symbols(stderr, self.log_max_symbols_size) if self.log_max_symbols_size else stderr, "structuredOut" : structured_out } diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index c6b78cec30..af41fe8cfe 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -579,7 +579,7 @@ class TestActionQueue(TestCase): config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") - + config.set('heartbeat', 'log_symbols_count', "900000") initializer_module = InitializerModule() initializer_module.init() initializer_module.config = config @@ -661,7 +661,7 @@ class TestActionQueue(TestCase): self.assertEqual(reports[0], expected) # now should not have reports (read complete/failed reports are deleted) - actionQueue.commandStatuses.clear_reported_reports() + actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports}) reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID] self.assertEqual(len(reports), 0) @@ -680,7 +680,7 @@ class TestActionQueue(TestCase): reports[0]['status'] == 'IN_PROGRESS': time.sleep(0.1) reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID] - actionQueue.commandStatuses.clear_reported_reports() + actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports}) # check report expected = {'status': 'FAILED', @@ -698,7 +698,7 @@ class TestActionQueue(TestCase): self.assertEqual(reports[0], expected) # now should not have reports (read complete/failed reports are deleted) - actionQueue.commandStatuses.clear_reported_reports() + actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports}) reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID] self.assertEqual(len(reports), 0) @@ -715,7 +715,7 @@ class TestActionQueue(TestCase): reports[0]['status'] == 'IN_PROGRESS': time.sleep(0.1) reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID] - actionQueue.commandStatuses.clear_reported_reports() + actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports}) # check report expected = {'status': 'COMPLETED', 'stderr': 'stderr', @@ -732,7 +732,7 @@ class TestActionQueue(TestCase): self.assertEqual(reports[0], expected) # now should not have reports (read complete/failed reports are deleted) - actionQueue.commandStatuses.clear_reported_reports() + actionQueue.commandStatuses.clear_reported_reports({CLUSTER_ID: reports}) reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID] self.assertEqual(len(reports), 0) diff --git a/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py b/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py index 44121dbb05..f2f2eda3bf 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py @@ -185,3 +185,81 @@ class TestCommandStatusDict:#(TestCase): 'actionId': '1-1', 'taskId': 5, 'exitCode': 777}] } self.assertEquals(report, expected) + + def test_size_approved(self): + # as json: '{"status": "IN_PROGRESS", "structuredOut": "structured_out.tmp", "taskId": 5}', length=77 + command_in_progress_report = { + 'status': 'IN_PROGRESS', + 'taskId': 5, + 'structuredOut' : 'structured_out.tmp', + } + mock = MagicMock() + command_statuses = CommandStatusDict(mock) + self.assertEqual(command_statuses.size_approved(command_in_progress_report, 78), True) + self.assertEqual(command_statuses.size_approved(command_in_progress_report, 77), True) + self.assertEqual(command_statuses.size_approved(command_in_progress_report, 76), False) + + def test_split_reports(self): + # 4 reports for each cluster, general size in json = 295 + generated_reports = \ + {'1': [{'status': 'FAILED', 'taskId': 3}, + {'status': 'FAILED', 'taskId': 4}, + {'status': 'FAILED', 'taskId': 5}, + {'status': 'FAILED', 'taskId': 6}], + '2': [{'status': 'FAILED', 'taskId': 7}, + {'status': 'FAILED', 'taskId': 8}, + {'status': 'FAILED', 'taskId': 9}, + {'status': 'FAILED', 'taskId': 10}], + } + mock = MagicMock() + command_statuses = CommandStatusDict(mock) + + # all reports will be send at once + splitted_reports = [] + for report in command_statuses.split_reports(generated_reports, 295): + splitted_reports.append(report) + + self.assertEqual(len(splitted_reports), 1) + self.assertEqual(len(splitted_reports[0]), 2) + self.assertEqual(len(splitted_reports[0]['1']), 4) + self.assertEqual(len(splitted_reports[0]['2']), 4) + + # all reports will be divided between two parts + # {'1': [{3}, {4}, {5}, {6}], '2': [{7}, {8}, {9}]} + # {'2': [{10}]} + splitted_reports = [] + for report in command_statuses.split_reports(generated_reports, 294): + splitted_reports.append(report) + + self.assertEqual(len(splitted_reports), 2) + self.assertEqual(len(splitted_reports[0]), 2) + self.assertEqual(len(splitted_reports[0]['1']), 4) + self.assertEqual(len(splitted_reports[0]['2']), 3) + self.assertEqual(len(splitted_reports[1]), 1) + self.assertEqual(len(splitted_reports[1]['2']), 1) + + # all reports will be divided between 8 parts + # {'1': [{3}]} + #... + # {'2': [{10}]} + splitted_reports = [] + for report in command_statuses.split_reports(generated_reports, 73): + splitted_reports.append(report) + + self.assertEqual(len(splitted_reports), 8) + self.assertEqual(len(splitted_reports[0]), 1) + self.assertEqual(len(splitted_reports[0]['1']), 1) + self.assertEqual(len(splitted_reports[1]), 1) + self.assertEqual(len(splitted_reports[1]['1']), 1) + self.assertEqual(len(splitted_reports[2]), 1) + self.assertEqual(len(splitted_reports[2]['1']), 1) + self.assertEqual(len(splitted_reports[3]), 1) + self.assertEqual(len(splitted_reports[3]['1']), 1) + self.assertEqual(len(splitted_reports[4]), 1) + self.assertEqual(len(splitted_reports[4]['2']), 1) + self.assertEqual(len(splitted_reports[5]), 1) + self.assertEqual(len(splitted_reports[5]['2']), 1) + self.assertEqual(len(splitted_reports[6]), 1) + self.assertEqual(len(splitted_reports[6]['2']), 1) + self.assertEqual(len(splitted_reports[7]), 1) + self.assertEqual(len(splitted_reports[7]['2']), 1) diff --git a/ambari-agent/src/test/python/ambari_agent/TestGrep.py b/ambari-agent/src/test/python/ambari_agent/TestGrep.py index 4f681ca523..dbb723ab4d 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestGrep.py +++ b/ambari-agent/src/test/python/ambari_agent/TestGrep.py @@ -95,6 +95,37 @@ debug: Processing report from ambari-dmi with processor Puppet::Reports::Store desired = '' self.assertEquals(fragment, desired, 'Grep tail function contains bug in index arithmetics') + + def test_tail_by_symbols_many_lines(self): + desired_size = len(self.string_good.strip()) + fragment = self.grep.tail_by_symbols(self.string_good, desired_size) + desired = self.string_good.strip() + self.assertEquals(fragment, desired, "Grep tail function should return all symbols if there are less symbols than n") + self.assertEquals(len(fragment), desired_size, "Grep tail function should return all symbols if there are less symbols than n") + + def test_tail_by_symbols_few_lines(self): + original = """ +debug: Finishing transaction 70060456663980 +debug: Received report to process from ambari-dmi +debug: Processing report from ambari-dmi with processor Puppet::Reports::Store +""" + desired = original.replace("\n", os.linesep).strip() + desired_size = len(original) + + fragment = self.grep.tail_by_symbols(self.string_good, desired_size) + self.assertEquals(fragment, desired, "Grep tail function should return only last 3 lines of file") + + fragment = self.grep.tail_by_symbols(self.string_good, desired_size - 1) + self.assertEquals(fragment, desired, "Grep tail function should return only last 2 lines of file") + + fragment = self.grep.tail_by_symbols(self.string_good, desired_size + 1) + self.assertEquals(fragment, desired, "Grep tail function should return only last 3 lines of file") + + def test_tail_by_symbols_no_lines(self): + fragment = self.grep.tail_by_symbols("", 3) + desired = '' + self.assertEquals(fragment, desired, 'Grep tail function should return "" for empty string') + def tearDown(self): pass diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py index 08cf06eb8f..472bf4c335 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py +++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py @@ -43,7 +43,7 @@ class TestPythonExecutor(TestCase): Tests whether watchdog works """ subproc_mock = self.subprocess32_mockup() - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) _, tmpoutfile = tempfile.mkstemp() _, tmperrfile = tempfile.mkstemp() _, tmpstrucout = tempfile.mkstemp() @@ -76,7 +76,7 @@ class TestPythonExecutor(TestCase): Tries to catch false positive watchdog invocations """ subproc_mock = self.subprocess32_mockup() - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) _, tmpoutfile = tempfile.mkstemp() _, tmperrfile = tempfile.mkstemp() _, tmpstrucout = tempfile.mkstemp() @@ -107,7 +107,7 @@ class TestPythonExecutor(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) def test_execution_results(self): subproc_mock = self.subprocess32_mockup() - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) _, tmpoutfile = tempfile.mkstemp() _, tmperrfile = tempfile.mkstemp() @@ -137,7 +137,7 @@ class TestPythonExecutor(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) def test_is_successfull(self): - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) executor.python_process_has_been_killed = False self.assertTrue(executor.isSuccessfull(0)) @@ -150,7 +150,7 @@ class TestPythonExecutor(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) def test_python_command(self): - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) command = executor.python_command("script", ["script_param1"]) self.assertEqual(3, len(command)) self.assertTrue("python" in command[0].lower()) @@ -164,7 +164,7 @@ class TestPythonExecutor(TestCase): # Test case when previous log file is absent isfile_mock.return_value = False log_file = "/var/lib/ambari-agent/data/output-13.txt" - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) executor.back_up_log_file_if_exists(log_file) self.assertEquals(isfile_mock.called, True) self.assertEquals(rename_mock.called, False) @@ -174,7 +174,7 @@ class TestPythonExecutor(TestCase): # Test case when 3 previous log files are absent isfile_mock.side_effect = [True, True, True, False] log_file = "/var/lib/ambari-agent/data/output-13.txt" - executor = PythonExecutor("/tmp", AmbariConfig().getConfig()) + executor = PythonExecutor("/tmp", AmbariConfig()) executor.back_up_log_file_if_exists(log_file) self.assertEquals(isfile_mock.called, True) self.assertEquals(rename_mock.call_args_list[0][0][0], "/var/lib/ambari-agent/data/output-13.txt") |