HEX
Server: Apache
System: Linux 162-240-236-42.bluehost.com 3.10.0-1160.114.2.el7.x86_64 #1 SMP Wed Mar 20 15:54:52 UTC 2024 x86_64
User: bt667 (1004)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/libexec/kcare/python/kcarectl/config_handlers.py
# Copyright (c) Cloud Linux Software, Inc
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT

import json
import os
import re

from . import config, constants, http_utils, log_utils, utils
from .py23 import ConfigParser

if False:  # pragma: no cover
    from typing import Dict, List, Optional, Set  # noqa: F401

CONFIG = '/etc/sysconfig/kcare/kcare.conf'
FEATURE_FLAGS_WHITELIST = [
    # signatures-related things
    'USE_CONTENT_FILE_V3',
    'FORCE_JSON_SIG_V3',
    # kernel module - related things
    'ENABLE_CRASHREPORTER',
    'KCORE_OUTPUT',
    'KMSG_OUTPUT',
    # checkin-related things
    'SEND_PERF_METRICS',
    # anomaly reports
    'KERNEL_ANOMALY_REPORT_ENABLE',
]

_CONFIG_OPTIONS = set()  # type: Set[str]   # options were read from config file


def bool_converter(value):  # type: (str) -> bool
    return value.upper() in ('1', 'TRUE', 'YES', 'Y')


POSSIBLE_CONFIG_OPTIONS = {
    # name: convert function (could be None)
    'AFTER_UPDATE_COMMAND': lambda v: v.strip(),
    'AUTO_STICKY_PATCHSET': None,
    'AUTO_UPDATE': bool_converter,
    'AUTO_UPDATE_DELAY': None,
    'BEFORE_UPDATE_COMMAND': lambda v: v.strip(),
    'CHECK_SSL_CERTS': bool_converter,
    'ENABLE_CRASHREPORTER': bool_converter,
    'FORCE_GID': None,
    'FORCE_IPV4': bool_converter,
    'FORCE_IPV6': bool_converter,
    'FORCE_JSON_SIG_V3': bool_converter,
    'HTTP_TIMEOUT': int,
    'HTTP_UPLOAD_TIMEOUT': int,
    'IGNORE_FEATURE_FLAGS': bool_converter,
    'IGNORE_UNKNOWN_KERNEL': bool_converter,
    'KCORE_OUTPUT': bool_converter,
    'KCORE_OUTPUT_SIZE': int,
    'KDUMPS_DIR': lambda v: v.rstrip('/'),
    'KERNEL_ANOMALY_REPORT_ENABLE': bool_converter,
    'KERNEL_ANOMALY_REPORT_MAX_SIZE_BYTES': int,
    'KMSG_OUTPUT': bool_converter,
    'LIBCARE_DISABLED': bool_converter,
    'LIBCARE_PIDLOGS_MAX_TOTAL_SIZE_MB': int,
    'LIBCARE_SOCKET_TIMEOUT': int,
    'LIB_AUTO_UPDATE': bool_converter,
    'PATCH_LEVEL': lambda v: v or None,
    'PATCH_METHOD': str.upper,
    'PATCH_SERVER': lambda v: v.rstrip('/'),
    'PATCH_SERVER_IPV6': lambda v: v.rstrip('/'),
    'PATCH_TYPE': str.lower,
    'PREFIX': None,
    'PREV_PATCH_TYPE': str.lower,
    'REGISTRATION_URL': lambda v: v.rstrip('/'),
    'REGISTRATION_URL_IPV6': lambda v: v.rstrip('/'),
    'PRINT_LEVEL': int,
    'REPORT_FQDN': bool_converter,
    'SILENCE_ERRORS': bool_converter,
    'STATUS_CHANGE_GAP': int,
    'STICKY_PATCH': str.upper,
    'STICKY_PATCHSET': None,
    'UPDATE_DELAY': None,
    'UPDATE_POLICY': str.upper,
    'UPDATE_SYSCTL_CONFIG': bool_converter,
    'USERSPACE_PATCHES': lambda v: [ptch.strip().lower() for ptch in v.split(',')],
    'USE_CONTENT_FILE_V3': bool_converter,
    'KERNEL_VERSION_FILE': None,
    'KCARE_UNAME_FILE': None,
    'SUCCESS_TIMEOUT': int,
    'SEND_PERF_METRICS': bool_converter,
}  # pragma: no py2 cover


def update_config(**kwargs):
    cf = open(CONFIG)
    lines = cf.readlines()
    cf.close()
    for prop, value in kwargs.items():
        updated = False
        prop_eq = prop + '='
        prop_sp = prop + ' '
        for i in range(len(lines)):
            if lines[i].startswith(prop_eq) or lines[i].startswith(prop_sp):
                if value is None:
                    del lines[i]
                else:
                    lines[i] = prop + ' = ' + str(value) + '\n'
                updated = True
                break
        if not updated:
            lines.append(prop + ' = ' + str(value) + '\n')
    utils.atomic_write(CONFIG, ''.join(lines))


def update_config_from_args(params):  # type: (List[str]) -> None
    params_for_update = {}
    pattern = re.compile(r'^([^=]+)=([^=]*)$')
    for param in params:
        match = pattern.match(param)
        if match:
            key, value = match.groups()
            if not value:
                value = None
        else:
            raise SystemExit('Invalid parameter format: %s. Format should be KEY=VALUE' % param)
        params_for_update[key] = value

    unknown_params = set(params_for_update) - set(POSSIBLE_CONFIG_OPTIONS)
    if unknown_params:
        raise SystemExit('Unknown parameter: %s' % ', '.join(sorted(unknown_params)))

    for var_name, value in params_for_update.items():
        convert = POSSIBLE_CONFIG_OPTIONS[var_name]
        if value is None or convert is None:
            continue
        try:
            convert(value)
        except Exception:
            raise SystemExit('Bad value for %s: %s' % (var_name, value))

    update_config(**params_for_update)


class FakeSecHead(object):
    def __init__(self, fp):
        self.fp = fp
        self.sechead = '[asection]\n'  # type: Optional[str]

    def readline(self):  # pragma: no py3 cover
        if self.sechead:
            try:
                return self.sechead
            finally:
                self.sechead = None
        else:
            return self.fp.readline()

    def __iter__(self):  # pragma: no py2 cover
        if self.sechead:
            yield self.sechead
            self.sechead = None
        for line in self.fp:
            yield line


def get_config_settings():
    result = {}
    cp = ConfigParser(defaults={'HTTP_PROXY': '', 'HTTPS_PROXY': ''})

    try:
        config = FakeSecHead(open(CONFIG))
        if constants.PY2:  # pragma: no py3 cover
            cp.readfp(config)
        else:  # pragma: no py2 cover
            cp.read_file(config)
    except Exception:
        return {}

    def read_var(name, default=None, convert=None):
        try:
            value = cp.get('asection', name)
        except Exception:
            value = default
        if value is not None:
            if convert:
                value = convert(value)

            result[name] = value

    for scheme, variable in [('http', 'HTTP_PROXY'), ('https', 'HTTPS_PROXY')]:
        # environment settings take precedence over kcare.config ones
        if not http_utils.get_proxy_from_env(scheme):
            proxy = cp.get('asection', variable)
            if proxy:
                os.environ[variable] = proxy

    for var_name, convert in POSSIBLE_CONFIG_OPTIONS.items():
        read_var(var_name, convert=convert)

    return result


def set_settings_from_config_file():
    _CONFIG_OPTIONS.clear()
    settings = get_config_settings()
    config.__dict__.update(settings)

    # memorize keys defined in the config file
    _CONFIG_OPTIONS.update(settings)


def convert_headers_to_feature_flags(headers):  # type: (Dict[str, str]) -> Dict[str, bool]
    """
    Checking headers for feature flags which start with 'KC-Flag-' and
    reformat it to dictionary with keys in upper case and without 'KC-Flag-' prefix
    and dashes replaced with underscores. For unification all header keys are checked in upper case.
    For example:
    'KC-Flag-Some-Value' -> 'SOME_VALUE'
    :return: dict {'SOME_VALUE': bool, ...}
    """
    # there is no dict comprehension in python 2.6
    flags = {}
    for hdr_name, hdr_value in headers.items():
        hdr_name = hdr_name.upper()
        if not hdr_name.startswith('KC-FLAG-'):
            continue

        param_name = hdr_name.replace('KC-FLAG-', '').replace('-', '_')
        try:
            flags[param_name] = bool(int(hdr_value))
        except ValueError:
            log_utils.kcarelog.error('Invalid feature flag header value %s: %s', hdr_name, hdr_value)

    return flags


def set_feature_flags_from_headers(headers):  # type: (Dict[str, str]) -> None
    save_feature_flags_cache(headers)
    if not config.IGNORE_FEATURE_FLAGS:
        set_feature_flags_from_cache()


@utils.catch_errors(logger=log_utils.logwarn)
def save_feature_flags_cache(headers):  # type: (Dict[str, str]) -> None
    feature_flags = convert_headers_to_feature_flags(headers)
    utils.atomic_write(constants.FEATURE_FLAGS_CACHE, content=json.dumps(feature_flags))


@utils.catch_errors(logger=log_utils.logwarn)
def set_feature_flags_from_cache():  # type: () -> None
    """
    Set global variables using feature flag from cached values received with patchserver headers.
    Checks that option is allowed by whitelist and update global variable
    using globals()
    :return: None
    """
    if not os.path.exists(constants.FEATURE_FLAGS_CACHE):
        return

    with open(constants.FEATURE_FLAGS_CACHE) as f:
        feature_flags = json.load(f)

    for key, value in feature_flags.items():
        if key not in FEATURE_FLAGS_WHITELIST:
            continue

        if key in _CONFIG_OPTIONS:
            # param was already set from config file, it has higher priority
            continue

        config.__dict__[key] = value
        log_utils.kcarelog.info('feature flags config override: %s=%s', key, value)