File: //usr/libexec/kcare/python/kcarectl/config_handlers.py
# Copyright (c) Cloud Linux Software, Inc
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
import json
import os
import re
from . import config, constants, http_utils, log_utils, utils
from .py23 import ConfigParser
if False: # pragma: no cover
from typing import Dict, List, Optional, Set # noqa: F401
CONFIG = '/etc/sysconfig/kcare/kcare.conf'
FEATURE_FLAGS_WHITELIST = [
# signatures-related things
'USE_CONTENT_FILE_V3',
'FORCE_JSON_SIG_V3',
# kernel module - related things
'ENABLE_CRASHREPORTER',
'KCORE_OUTPUT',
'KMSG_OUTPUT',
# checkin-related things
'SEND_PERF_METRICS',
# anomaly reports
'KERNEL_ANOMALY_REPORT_ENABLE',
]
_CONFIG_OPTIONS = set() # type: Set[str] # options were read from config file
def bool_converter(value): # type: (str) -> bool
return value.upper() in ('1', 'TRUE', 'YES', 'Y')
POSSIBLE_CONFIG_OPTIONS = {
# name: convert function (could be None)
'AFTER_UPDATE_COMMAND': lambda v: v.strip(),
'AUTO_STICKY_PATCHSET': None,
'AUTO_UPDATE': bool_converter,
'AUTO_UPDATE_DELAY': None,
'BEFORE_UPDATE_COMMAND': lambda v: v.strip(),
'CHECK_SSL_CERTS': bool_converter,
'ENABLE_CRASHREPORTER': bool_converter,
'FORCE_GID': None,
'FORCE_IPV4': bool_converter,
'FORCE_IPV6': bool_converter,
'FORCE_JSON_SIG_V3': bool_converter,
'HTTP_TIMEOUT': int,
'HTTP_UPLOAD_TIMEOUT': int,
'IGNORE_FEATURE_FLAGS': bool_converter,
'IGNORE_UNKNOWN_KERNEL': bool_converter,
'KCORE_OUTPUT': bool_converter,
'KCORE_OUTPUT_SIZE': int,
'KDUMPS_DIR': lambda v: v.rstrip('/'),
'KERNEL_ANOMALY_REPORT_ENABLE': bool_converter,
'KERNEL_ANOMALY_REPORT_MAX_SIZE_BYTES': int,
'KMSG_OUTPUT': bool_converter,
'LIBCARE_DISABLED': bool_converter,
'LIBCARE_PIDLOGS_MAX_TOTAL_SIZE_MB': int,
'LIBCARE_SOCKET_TIMEOUT': int,
'LIB_AUTO_UPDATE': bool_converter,
'PATCH_LEVEL': lambda v: v or None,
'PATCH_METHOD': str.upper,
'PATCH_SERVER': lambda v: v.rstrip('/'),
'PATCH_SERVER_IPV6': lambda v: v.rstrip('/'),
'PATCH_TYPE': str.lower,
'PREFIX': None,
'PREV_PATCH_TYPE': str.lower,
'REGISTRATION_URL': lambda v: v.rstrip('/'),
'REGISTRATION_URL_IPV6': lambda v: v.rstrip('/'),
'PRINT_LEVEL': int,
'REPORT_FQDN': bool_converter,
'SILENCE_ERRORS': bool_converter,
'STATUS_CHANGE_GAP': int,
'STICKY_PATCH': str.upper,
'STICKY_PATCHSET': None,
'UPDATE_DELAY': None,
'UPDATE_POLICY': str.upper,
'UPDATE_SYSCTL_CONFIG': bool_converter,
'USERSPACE_PATCHES': lambda v: [ptch.strip().lower() for ptch in v.split(',')],
'USE_CONTENT_FILE_V3': bool_converter,
'KERNEL_VERSION_FILE': None,
'KCARE_UNAME_FILE': None,
'SUCCESS_TIMEOUT': int,
'SEND_PERF_METRICS': bool_converter,
} # pragma: no py2 cover
def update_config(**kwargs):
cf = open(CONFIG)
lines = cf.readlines()
cf.close()
for prop, value in kwargs.items():
updated = False
prop_eq = prop + '='
prop_sp = prop + ' '
for i in range(len(lines)):
if lines[i].startswith(prop_eq) or lines[i].startswith(prop_sp):
if value is None:
del lines[i]
else:
lines[i] = prop + ' = ' + str(value) + '\n'
updated = True
break
if not updated:
lines.append(prop + ' = ' + str(value) + '\n')
utils.atomic_write(CONFIG, ''.join(lines))
def update_config_from_args(params): # type: (List[str]) -> None
params_for_update = {}
pattern = re.compile(r'^([^=]+)=([^=]*)$')
for param in params:
match = pattern.match(param)
if match:
key, value = match.groups()
if not value:
value = None
else:
raise SystemExit('Invalid parameter format: %s. Format should be KEY=VALUE' % param)
params_for_update[key] = value
unknown_params = set(params_for_update) - set(POSSIBLE_CONFIG_OPTIONS)
if unknown_params:
raise SystemExit('Unknown parameter: %s' % ', '.join(sorted(unknown_params)))
for var_name, value in params_for_update.items():
convert = POSSIBLE_CONFIG_OPTIONS[var_name]
if value is None or convert is None:
continue
try:
convert(value)
except Exception:
raise SystemExit('Bad value for %s: %s' % (var_name, value))
update_config(**params_for_update)
class FakeSecHead(object):
def __init__(self, fp):
self.fp = fp
self.sechead = '[asection]\n' # type: Optional[str]
def readline(self): # pragma: no py3 cover
if self.sechead:
try:
return self.sechead
finally:
self.sechead = None
else:
return self.fp.readline()
def __iter__(self): # pragma: no py2 cover
if self.sechead:
yield self.sechead
self.sechead = None
for line in self.fp:
yield line
def get_config_settings():
result = {}
cp = ConfigParser(defaults={'HTTP_PROXY': '', 'HTTPS_PROXY': ''})
try:
config = FakeSecHead(open(CONFIG))
if constants.PY2: # pragma: no py3 cover
cp.readfp(config)
else: # pragma: no py2 cover
cp.read_file(config)
except Exception:
return {}
def read_var(name, default=None, convert=None):
try:
value = cp.get('asection', name)
except Exception:
value = default
if value is not None:
if convert:
value = convert(value)
result[name] = value
for scheme, variable in [('http', 'HTTP_PROXY'), ('https', 'HTTPS_PROXY')]:
# environment settings take precedence over kcare.config ones
if not http_utils.get_proxy_from_env(scheme):
proxy = cp.get('asection', variable)
if proxy:
os.environ[variable] = proxy
for var_name, convert in POSSIBLE_CONFIG_OPTIONS.items():
read_var(var_name, convert=convert)
return result
def set_settings_from_config_file():
_CONFIG_OPTIONS.clear()
settings = get_config_settings()
config.__dict__.update(settings)
# memorize keys defined in the config file
_CONFIG_OPTIONS.update(settings)
def convert_headers_to_feature_flags(headers): # type: (Dict[str, str]) -> Dict[str, bool]
"""
Checking headers for feature flags which start with 'KC-Flag-' and
reformat it to dictionary with keys in upper case and without 'KC-Flag-' prefix
and dashes replaced with underscores. For unification all header keys are checked in upper case.
For example:
'KC-Flag-Some-Value' -> 'SOME_VALUE'
:return: dict {'SOME_VALUE': bool, ...}
"""
# there is no dict comprehension in python 2.6
flags = {}
for hdr_name, hdr_value in headers.items():
hdr_name = hdr_name.upper()
if not hdr_name.startswith('KC-FLAG-'):
continue
param_name = hdr_name.replace('KC-FLAG-', '').replace('-', '_')
try:
flags[param_name] = bool(int(hdr_value))
except ValueError:
log_utils.kcarelog.error('Invalid feature flag header value %s: %s', hdr_name, hdr_value)
return flags
def set_feature_flags_from_headers(headers): # type: (Dict[str, str]) -> None
save_feature_flags_cache(headers)
if not config.IGNORE_FEATURE_FLAGS:
set_feature_flags_from_cache()
@utils.catch_errors(logger=log_utils.logwarn)
def save_feature_flags_cache(headers): # type: (Dict[str, str]) -> None
feature_flags = convert_headers_to_feature_flags(headers)
utils.atomic_write(constants.FEATURE_FLAGS_CACHE, content=json.dumps(feature_flags))
@utils.catch_errors(logger=log_utils.logwarn)
def set_feature_flags_from_cache(): # type: () -> None
"""
Set global variables using feature flag from cached values received with patchserver headers.
Checks that option is allowed by whitelist and update global variable
using globals()
:return: None
"""
if not os.path.exists(constants.FEATURE_FLAGS_CACHE):
return
with open(constants.FEATURE_FLAGS_CACHE) as f:
feature_flags = json.load(f)
for key, value in feature_flags.items():
if key not in FEATURE_FLAGS_WHITELIST:
continue
if key in _CONFIG_OPTIONS:
# param was already set from config file, it has higher priority
continue
config.__dict__[key] = value
log_utils.kcarelog.info('feature flags config override: %s=%s', key, value)