1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
|
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import fnmatch
import json
import os
import params
import socket
import time
import traceback
from resource_management.core.shell import call
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions.format import format
from resource_management.libraries.resources.hdfs_resource import HdfsResource
index_helper_script = '/usr/lib/ambari-infra-solr-client/solrIndexHelper.sh'
# folder location which contains the snapshot/core folder
index_location = default("/commandParams/solr_index_location", None)
# index version (available index versions: 6.6.2 and 7.3.1, second one is used by default)
index_version = default("/commandParams/solr_index_version", '6.6.2')
# if this flag is false, skip upgrade if the version is proper, you can force to re-run the tool with setting the flag to true
force = default("/commandParams/solr_index_upgrade_force", False)
# if this flag is true, then it will generate specific folder for every backup with a hostname suffix
# where "." chars replaced with "_"(e.g.: /my/path/backup_locationc7301_ambari_apache_org), that can be useful if different
# hosts share the same filesystem where the backup is stored.
shared_fs = default("/commandParams/solr_shared_fs", False)
# set verbose log for index migration (default: true)
debug = default("/commandParams/solr_migrate_debug", True)
# used for filtering folders in backup location (like: if the filter is ranger, that will include snapshot.ranger folder but won't include snapshot.hadoop_logs)
core_filter = default("/commandParams/solr_core_filter", None)
# used to filer out comma separated cores - can be useful if backup/resotre failed in some point
skip_cores = default("/commandParams/solr_skip_cores", "").split(",")
# delete write.lock file at the start of lucene index migration process
delete_lock_on_start = default("/commandParams/solr_delete_lock_on_start", True)
# if it used, then core filter will be used with snapshot.* folder pattern
backup_mode = default("/commandParams/solr_migrate_backup", True)
log_output = default("/commandParams/solr_migrate_logoutput", True)
# Solr colleection name (used for DELETE/BACKUP/RESTORE)
collection = default("/commandParams/solr_collection", "ranger_audits")
# it will be used in the snapshot name, if it's ranger, the snapshot folder will be snapshot.ranger
backup_name = default("/commandParams/solr_backup_name", "ranger")
request_async = default("/commandParams/solr_request_async", False)
request_tries = int(default("/commandParams/solr_request_tries", 30))
request_time_interval = int(default("/commandParams/solr_request_time_interval", 5))
check_hosts_default = True if params.security_enabled else False
check_hosts = default("/commandParams/solr_check_hosts", check_hosts_default)
skip_generate_restore_host_cores = default("/commandParams/solr_skip_generate_restore_host_cores", False)
solr_protocol = "https" if params.infra_solr_ssl_enabled else "http"
solr_port = format("{params.infra_solr_port}")
solr_base_url = format("{solr_protocol}://{params.hostname}:{params.infra_solr_port}/solr")
solr_datadir = params.infra_solr_datadir
solr_keep_backup=default("/commandParams/solr_keep_backup", False)
solr_num_shards = int(default("/commandParams/solr_shards", "0"))
solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
if solr_hdfs_path:
import functools
from resource_management.libraries.functions import conf_select
from resource_management.libraries.functions import stack_select
from resource_management.libraries.functions import get_klist_path
from resource_management.libraries.functions import get_kinit_path
from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None))
kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
# hadoop default parameters
hdfs_user = params.config['configurations']['hadoop-env']['hdfs_user']
hadoop_bin = stack_select.get_hadoop_dir("sbin")
hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure")
hadoop_lib_home = stack_select.get_hadoop_dir("lib")
hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
hdfs_user_keytab = params.config['configurations']['hadoop-env']['hdfs_user_keytab']
dfs_type = default("/commandParams/dfs_type", "")
hdfs_site = params.config['configurations']['hdfs-site']
default_fs = params.config['configurations']['core-site']['fs.defaultFS']
#create partial functions with common arguments for every HdfsResource call
#to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code
HdfsResource = functools.partial(
HdfsResource,
user=params.infra_solr_user,
hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
security_enabled = params.security_enabled,
keytab = hdfs_user_keytab,
kinit_path_local = kinit_path_local,
hadoop_bin_dir = hadoop_bin_dir,
hadoop_conf_dir = hadoop_conf_dir,
principal_name = hdfs_principal_name,
hdfs_site = hdfs_site,
default_fs = default_fs,
immutable_paths = get_not_managed_resources(),
dfs_type = dfs_type
)
if params.security_enabled:
keytab = params.infra_solr_kerberos_keytab
principal = params.infra_solr_kerberos_principal
hostname_suffix = params.hostname.replace(".", "_")
HOST_CORES='host-cores'
CORE_HOST='core-host'
HOST_SHARDS='host-shards'
CORE_DATA='core-data'
if shared_fs:
index_location = format("{index_location}_{hostname_suffix}")
def get_files_by_pattern(directory, pattern):
for root, dirs, files in os.walk(directory):
for basename in files:
try:
matched = pattern.match(basename)
except AttributeError:
matched = fnmatch.fnmatch(basename, pattern)
if matched:
yield os.path.join(root, basename)
def create_solr_api_request_command(request_path, output=None, override_solr_base_url=None):
solr_url = format("{solr_base_url}/{request_path}") if override_solr_base_url is None else format("{override_solr_base_url}/{request_path}")
grep_cmd = " | grep 'solr_rs_status: 200'"
api_cmd = format("kinit -kt {keytab} {principal} && curl -w'solr_rs_status: %{{http_code}}' -k --negotiate -u : '{solr_url}'") \
if params.security_enabled else format("curl -w'solr_rs_status: %{{http_code}}' -k '{solr_url}'")
if output is not None:
api_cmd+=format(" -o {output}")
api_cmd+=grep_cmd
return api_cmd
def snapshot_status_check(request_cmd, json_output, snapshot_name, backup=True, log_output=True, tries=30, time_interval=5):
"""
Check BACKUP/RESTORE status until the response status will be successful or failed.
:param request_cmd: backup or restore api path
:param json_output: json file which will store the response output
:param snapshot_name: snapshot name, it will be used to check the proper status in the status response (backup: <snapshot_name>, restore: snapshot.<snapshot_name>)
:param backup: this flag is true if the check is against backup, otherwise it will be restore
:param log_output: print the output of the downloaded json file (backup/restore response)
:param tries: number of tries of the requests - it stops after the response status is successful for backup/restore
:param time_interval: time to wait in seconds between retries
"""
failed = True
num_tries = 0
for i in range(tries):
try:
num_tries+=1
if (num_tries > 1):
Logger.info(format("Number of tries: {num_tries} ..."))
Execute(request_cmd, user=params.infra_solr_user)
with open(json_output) as json_file:
json_data = json.load(json_file)
if backup:
details = json_data['details']
if 'backup' in details:
backup_list = details['backup']
if log_output:
Logger.info(str(backup_list))
if type(backup_list) == type(list()): # support map and list format as well
backup_data = dict(backup_list[i:i+2] for i in range(0, len(backup_list), 2))
else:
backup_data = backup_list
if (not 'snapshotName' in backup_data) or backup_data['snapshotName'] != snapshot_name:
snapshot = backup_data['snapshotName']
Logger.info(format("Snapshot name: {snapshot}, wait until {snapshot_name} will be available."))
time.sleep(time_interval)
continue
if backup_data['status'] == 'success':
Logger.info("Backup command status: success.")
failed = False
elif backup_data['status'] == 'failed':
Logger.info("Backup command status: failed.")
else:
Logger.info(format("Backup command is in progress... Sleep for {time_interval} seconds."))
time.sleep(time_interval)
continue
else:
Logger.info("Backup data is not found yet in details JSON response...")
time.sleep(time_interval)
continue
else:
if 'restorestatus' in json_data:
restorestatus_data = json_data['restorestatus']
if log_output:
Logger.info(str(restorestatus_data))
if (not 'snapshotName' in restorestatus_data) or restorestatus_data['snapshotName'] != format("snapshot.{snapshot_name}"):
snapshot = restorestatus_data['snapshotName']
Logger.info(format("Snapshot name: {snapshot}, wait until snapshot.{snapshot_name} will be available."))
time.sleep(time_interval)
continue
if restorestatus_data['status'] == 'success':
Logger.info("Restore command successfully finished.")
failed = False
elif restorestatus_data['status'] == 'failed':
Logger.info("Restore command failed.")
else:
Logger.info(format("Restore command is in progress... Sleep for {time_interval} seconds."))
time.sleep(time_interval)
continue
else:
Logger.info("Restore status data is not found yet in details JSON response...")
time.sleep(time_interval)
continue
except Exception:
traceback.print_exc()
time.sleep(time_interval)
continue
break
if failed:
raise Exception("Status Command failed.")
else:
Logger.info("Status command finished successfully.")
def __get_domain_name(url):
spltAr = url.split("://")
i = (0,1)[len(spltAr) > 1]
dm = spltAr[i].split('/')[0].split(':')[0].lower()
return dm
def __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores_path):
"""
Fill (and write to file) a JSON object with core data from state.json (znode).
"""
json_content={}
hosts_core_map={}
hosts_shard_map={}
core_host_map={}
core_data_map={}
with open(json_zk_state_path) as json_file:
json_data = json.load(json_file)
znode = json_data['znode']
data = json.loads(znode['data'])
collection_data = data[collection]
shards = collection_data['shards']
for shard in shards:
Logger.info(format("Found shard: {shard}"))
replicas = shards[shard]['replicas']
for replica in replicas:
core_data = replicas[replica]
core = core_data['core']
base_url = core_data['base_url']
state = core_data['state']
leader = core_data['leader'] if 'leader' in core_data else 'false'
domain = __get_domain_name(base_url)
if state == 'active' and leader == 'true':
if domain not in hosts_core_map:
hosts_core_map[domain]=[]
if domain not in hosts_shard_map:
hosts_shard_map[domain]=[]
if core not in core_data_map:
core_data_map[core]={}
hosts_core_map[domain].append(core)
hosts_shard_map[domain].append(shard)
core_host_map[core]=domain
core_data_map[core]['host']=domain
core_data_map[core]['node']=replica
if 'type' in core_data:
core_data_map[core]['type']=core_data['type']
else:
core_data_map[core]['type']='NRT'
core_data_map[core]['shard']=shard
Logger.info(format("Found leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
else:
Logger.info(format("Found non-leader/active replica: {replica} (core '{core}') in {shard} on {domain}"))
json_content[HOST_CORES]=hosts_core_map
json_content[CORE_HOST]=core_host_map
json_content[HOST_SHARDS]=hosts_shard_map
json_content[CORE_DATA]=core_data_map
with open(json_host_cores_path, 'w') as outfile:
json.dump(json_content, outfile)
return json_content
def __read_host_cores_from_file(json_host_cores_path):
"""
Read host cores from file, can be useful if you do not want to regenerate host core data (with that you can generate your own host core pairs for restore)
"""
with open(json_host_cores_path) as json_file:
host_cores_json_data = json.load(json_file)
return host_cores_json_data
def get_host_cores_for_collection(backup=True):
"""
Get core details to an object and write them to a file as well. Backup data will be used during restore.
:param backup: if enabled, save file into backup_host_cores.json, otherwise use restore_host_cores.json
:return: detailed json about the cores
"""
request_path = 'admin/zookeeper?wt=json&detail=true&path=%2Fclusterstate.json&view=graph'
json_folder = format("{index_location}")
json_zk_state_path = format("{json_folder}/zk_state.json")
if backup:
json_host_cores_path = format("{json_folder}/backup_host_cores.json")
else:
json_host_cores_path = format("{json_folder}/restore_host_cores.json")
api_request = create_solr_api_request_command(request_path, output=json_zk_state_path)
Execute(api_request, user=params.infra_solr_user)
return __read_host_cores_from_file(json_host_cores_path) if skip_generate_restore_host_cores \
else __read_host_cores_from_clusterstate_json(json_zk_state_path, json_host_cores_path)
def read_backup_json():
with open(format("{index_location}/backup_host_cores.json")) as json_file:
json_data = json.load(json_file)
return json_data
def create_core_pairs(original_cores, new_cores):
"""
Create core pairss from the original and new cores (backups -> restored ones), use alphabetic order
"""
core_pairs_data=[]
if len(new_cores) < len(original_cores):
raise Exception("Old collection core size is: " + str(len(new_cores)) +
". You will need at least: " + str(len(original_cores)))
else:
for index, core_data in enumerate(original_cores):
value={}
value['src_core']=core_data[0]
value['src_host']=core_data[1]
value['target_core']=new_cores[index][0]
value['target_host']=new_cores[index][1]
core_pairs_data.append(value)
with open(format("{index_location}/restore_core_pairs.json"), 'w') as outfile:
json.dump(core_pairs_data, outfile)
return core_pairs_data
def sort_core_host_pairs(host_core_map):
"""
Sort host core map by key
"""
core_host_pairs=[]
for key in sorted(host_core_map):
core_host_pairs.append((key, host_core_map[key]))
return core_host_pairs
def is_ip(addr):
try:
socket.inet_aton(addr)
return True
except socket.error:
return False
def resolve_ip_to_hostname(ip):
try:
host_name = socket.gethostbyaddr(ip)[0].lower()
Logger.info(format("Resolved {ip} to {host_name}"))
fqdn_name = socket.getaddrinfo(host_name, 0, 0, 0, 0, socket.AI_CANONNAME)[0][3].lower()
return host_name if host_name == fqdn_name else fqdn_name
except socket.error:
pass
return ip
def create_command(command):
"""
Create hdfs command. Append kinit to the command if required.
"""
kinit_cmd = "{0} -kt {1} {2};".format(kinit_path_local, params.infra_solr_kerberos_keytab, params.infra_solr_kerberos_principal) if params.security_enabled else ""
return kinit_cmd + command
def execute_commad(command):
"""
Run hdfs command by infra-solr user
"""
return call(command, user=params.infra_solr_user, timeout=300)
def move_hdfs_folder(source_dir, target_dir):
cmd=create_command(format("hdfs dfs -mv {source_dir} {target_dir}"))
returncode, stdout = execute_commad(cmd)
if returncode:
raise Fail("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
return stdout.strip()
def check_hdfs_folder_exists(hdfs_dir):
"""
Check that hdfs folder exists or not
"""
cmd=create_command(format("hdfs dfs -ls {hdfs_dir}"))
returncode, stdout = execute_commad(cmd)
if returncode:
return False
return True
def check_folder_exists(dir):
"""
Check that folder exists or not
"""
returncode, stdout = call(format("test -d {dir}"), user=params.infra_solr_user, timeout=300)
if returncode:
return False
return True
|