HEX
Server: Apache
System: Linux 162-240-236-42.bluehost.com 3.10.0-1160.114.2.el7.x86_64 #1 SMP Wed Mar 20 15:54:52 UTC 2024 x86_64
User: bt667 (1004)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/libexec/kcare/python/kcarectl/anomaly.py
# Copyright (c) Cloud Linux Software, Inc
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT

import io
import json
import os
import shlex
import tarfile
import time
import uuid
from tempfile import NamedTemporaryFile

from kcarectl import auth, config, http_utils, ipv6_support, kcare, log_utils, utils
from kcarectl.process_utils import run_command

if False:  # pragma: no cover
    from typing import Any, Dict, List, Optional  # noqa: F401

    from typing_extensions import Self  # noqa: F401


class DataPackage(object):
    """based on DataPackage from eportal"""

    def __init__(self):
        # type: () -> None
        self._tar = None  # type: Optional[tarfile.TarFile]
        self._errors_buffer = []  # type: List[str]
        self._total_payload_size = 0

    @property
    def archive_path(self):
        # type: () -> str
        tar = self._ensure_tar_created()
        return str(tar.name)

    def add_stdout(self, arcname, cmd):
        # type: (str, str) -> None
        stdout = None
        stderr = None

        try:
            _, stdout, stderr = run_command(shlex.split(cmd), catch_stdout=True, catch_stderr=True)
        except Exception as e:
            stderr = str(e)

        if stderr:
            self.log_error('failed to dump stdout of {0}:\n{1}'.format(cmd, stderr))

        if stdout is not None:
            self.add_file(arcname, data_bytes=utils.bstr(stdout, encoding='utf-8'))

    def add_file(self, arcname, src_path=None, data_bytes=None, skip_limit_check=False):
        # type: (str, Optional[str], Optional[bytes], bool) -> None
        if src_path is None and data_bytes is None:
            raise ValueError('No src_path or data_bytes provided')

        tar = self._ensure_tar_created()
        entry_size = 0
        if src_path is not None:
            if not os.path.exists(src_path):
                self.log_error('file not found: {0}'.format(src_path))
                return

            entry_size = os.path.getsize(src_path)
        else:
            entry_size = len(data_bytes)  # type: ignore[arg-type]

        if not self._check_required_space(entry_size):
            self.log_error('no available space to store: {0}'.format(arcname))
            return

        if not skip_limit_check and not self._check_total_payload_limit(entry_size):
            self.log_error('skip due to total payload size limit: {0}'.format(arcname))
            return

        try:
            if src_path:
                tar.add(src_path, arcname=arcname)
            else:
                info = tarfile.TarInfo(arcname)
                info.size = entry_size
                tar.addfile(info, io.BytesIO(data_bytes))  # type: ignore[arg-type]

            self._total_payload_size += entry_size
        except Exception as e:
            self.log_error('failed to store {0}: {1}'.format(arcname, e))

    def add_json(self, arcname, data):
        # type: (str, Dict[str, Any]) -> None
        try:
            data_bytes = utils.bstr(json.dumps(data, indent=4), encoding='utf-8')
        except TypeError as e:
            self.log_error('failed to dump {0}:\n{1}'.format(arcname, e))
            return

        self.add_file(arcname, data_bytes=data_bytes)

    def _check_required_space(self, entry_size):
        # type: (int) -> bool
        # here we simplify the check and ignore that the compressed file size will be less
        statvfs = os.statvfs(self.archive_path)
        return statvfs.f_frsize * statvfs.f_bfree > entry_size

    def _check_total_payload_limit(self, entry_size):
        # type: (int) -> bool
        # here we simplify the check and ignore that the compressed file size will be less
        return config.KERNEL_ANOMALY_REPORT_MAX_SIZE_BYTES > self._total_payload_size + entry_size

    def log_error(self, error_msg):
        # type: (str) -> None
        error_msg = error_msg.strip()
        log_utils.logerror(error_msg, print_msg=False)
        self._errors_buffer.append(error_msg)

    def __enter__(self):
        # type: () -> Self
        for compression_mode in ('w:xz', 'w:bz2', 'w:gz'):  # pragma: no branch
            tmpfile = NamedTemporaryFile(suffix='.tar.{0}'.format(compression_mode[2:]), delete=False)
            tmpfile.close()
            try:
                log_utils.loginfo('Creating DataPackage: {0}'.format(tmpfile.name), print_msg=False)
                self._tar = tarfile.open(name=tmpfile.name, mode=compression_mode)
                return self
            except Exception as err:  # pragma: no cover
                if os.path.exists(tmpfile.name):
                    os.unlink(tmpfile.name)

                self._tar = None
                if not isinstance(err, tarfile.CompressionError):
                    raise

        raise tarfile.CompressionError('No supported compression method found')  # pragma: no cover

    def __exit__(self, exc_type, exc_val, exc_tb):
        # type: (Optional[type[BaseException]], Optional[BaseException], Any) -> bool
        if self._errors_buffer:  # pragma: no branch
            errors = '\n'.join(self._errors_buffer) + '\n'
            self.add_file('errors.log', data_bytes=utils.bstr(errors), skip_limit_check=True)

        if self._tar:  # pragma: no branch
            self._tar.close()
            if exc_val:
                self.remove_archive()
                return False

        return True

    @utils.catch_errors(logger=log_utils.logwarn)
    def remove_archive(self):
        # type: () -> None
        if self._tar and os.path.exists(self.archive_path):  # pragma: no branch
            os.unlink(self.archive_path)

    def _ensure_tar_created(self):
        # type: () -> tarfile.TarFile
        if not self._tar:
            raise RuntimeError('DataPackage should be used as a context manager')
        return self._tar


@utils.catch_errors(logger=log_utils.logwarn)
def send_data_package(data_package):
    # type: (DataPackage) -> str
    """Send the DataPackage archive to the patch server.

    :param data_package: DataPackage instance to send
    :return: Upload name (package identifier)
    """

    # Generate a unique package name
    # Use find('.') to get extension from first dot to preserve .tar.xz/.tar.bz2/.tar.gz
    basename = os.path.basename(data_package.archive_path)
    ext = basename[basename.find('.') :] if '.' in basename else ''
    upload_name = str(uuid.uuid4()) + ext
    upload_url = ipv6_support.get_patch_server() + '/upload/kernel-anomaly/' + upload_name
    http_utils.upload_file(
        data_package.archive_path,
        upload_url=upload_url,
        auth_string=auth.get_http_auth_string(),
    )

    return upload_name


def make_manifest_json():  # pragma: no cover
    # type: () -> Dict[str, Any]
    return {
        "schema_version": 1,
        "type": "kernel-anomaly",
        "time_created": int(time.time()),
    }


def copy_recent_files(files, data_package, archive_prefix):
    # type: (List[str], DataPackage, str) -> None
    """adds recent files for the last hour to the given data package starting from the newest one"""

    now = time.time()
    for path, ctime in kcare.sort_files_by_ctime(files):
        if now - ctime > 3600:
            break

        arcname = '{0}/{1}'.format(archive_prefix, path.replace('/', '_'))
        data_package.add_file(arcname, src_path=path)


@utils.catch_errors(logger=log_utils.logwarn)
def prepare_kernel_anomaly_report(server_info):  # pragma: no cover
    # type: (Dict[str, Any]) -> DataPackage

    data_package = DataPackage()

    with data_package:
        if os.path.exists('/var/log/messages'):
            data_package.add_stdout('messages', 'tail -n10000 /var/log/messages')

        if os.path.exists('/var/log/syslog'):
            data_package.add_stdout('syslog', 'tail -n10000 /var/log/syslog')

        data_package.add_stdout('kcarectl.log', 'tail -n10000 /var/log/kcarectl.log')
        data_package.add_stdout('dmesg', 'dmesg')
        data_package.add_stdout('ls_var_cache_kcare', 'ls -lR /var/cache/kcare/')

        if os.path.exists('/usr/bin/rpm') or os.path.exists('/bin/rpm'):
            packages_cmd = r'rpm -q -a --queryformat="%{N}|%{V}-%{R}|%{arch}|%{INSTALLTIME:date}\n"'
        elif os.path.exists('/usr/bin/dpkg'):
            packages_cmd = r'/usr/bin/dpkg-query -W -f "${binary:Package}|${Version}|${Architecture}\n"'
            data_package.add_file('dpkg.log', src_path='/var/log/dpkg.log')
        else:
            packages_cmd = 'echo "unknown package manager"'

        data_package.add_stdout('packages.list', packages_cmd)

        # tar fails to add files from /proc directly so we first read them to memory
        with open('/proc/version') as f:
            data_package.add_file('proc_version', data_bytes=utils.bstr(f.read()))

        with open('/proc/modules') as f:
            data_package.add_file('proc_modules', data_bytes=utils.bstr(f.read()))

        data_package.add_file('kcare.conf', src_path='/etc/sysconfig/kcare/kcare.conf')
        data_package.add_json('server_info.json', server_info)
        data_package.add_json('manifest.json', make_manifest_json())

        # kdump
        data_package.add_file('kdump.conf', src_path='/etc/kdump.conf')
        data_package.add_stdout('ls_kdump', 'ls -lR {0}'.format(kcare.get_kdump_root()))
        try:
            copy_recent_files(kcare.list_kdump_txt_files(), data_package, 'kdump')
        except Exception as e:
            data_package.log_error('failed to copy kdumps:\n{0}'.format(e))

        try:
            copy_recent_files(kcare.list_crashreporter_log_files(), data_package, 'crashreporter')
        except Exception as e:
            data_package.log_error('failed to copy crashreporter artifacts:\n{0}'.format(e))

    return data_package


@utils.catch_errors(logger=log_utils.logwarn, default_return=False)
def detect_anomaly(server_info):
    # type: (Dict[str, Any]) -> bool
    """taken from eportal - anomalies::detect_agent_reboot"""

    reason = server_info['reason']  # type: str
    uptime = int(server_info['uptime'])
    patch_level = int(server_info.get('patch_level') or '-1')
    last_stop = int(server_info['last_stop'])
    ts = server_info['ts']  # type: int

    try:
        state_ts = int(server_info['state']['ts'])
    except (KeyError, TypeError, ValueError):
        state_ts = 0

    fields = [reason, uptime, patch_level, state_ts, last_stop, ts]

    if not all(fields):
        return False

    first_update_after_reboot_marker = False
    crash_soon_after_update_marker = False
    no_proper_shutdown_marker = False

    if uptime < 300 and patch_level == -1:
        first_update_after_reboot_marker = True
    if (ts - uptime) > state_ts > (ts - uptime - 1800) and reason == 'update':
        crash_soon_after_update_marker = True
    if last_stop < state_ts:  # pragma: no branch
        no_proper_shutdown_marker = True

    markers = [
        first_update_after_reboot_marker,
        crash_soon_after_update_marker,
        no_proper_shutdown_marker,
    ]

    if all(markers):
        log_utils.loginfo('Agent anomaly detected: {0}'.format(server_info))
        return True

    return False


if __name__ == '__main__':  # pragma: no cover
    prepare_kernel_anomaly_report({})