File: //usr/libexec/kcare/python/kcarectl/anomaly.py
# Copyright (c) Cloud Linux Software, Inc
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
import io
import json
import os
import shlex
import tarfile
import time
import uuid
from tempfile import NamedTemporaryFile
from kcarectl import auth, config, http_utils, ipv6_support, kcare, log_utils, utils
from kcarectl.process_utils import run_command
if False: # pragma: no cover
from typing import Any, Dict, List, Optional # noqa: F401
from typing_extensions import Self # noqa: F401
class DataPackage(object):
"""based on DataPackage from eportal"""
def __init__(self):
# type: () -> None
self._tar = None # type: Optional[tarfile.TarFile]
self._errors_buffer = [] # type: List[str]
self._total_payload_size = 0
@property
def archive_path(self):
# type: () -> str
tar = self._ensure_tar_created()
return str(tar.name)
def add_stdout(self, arcname, cmd):
# type: (str, str) -> None
stdout = None
stderr = None
try:
_, stdout, stderr = run_command(shlex.split(cmd), catch_stdout=True, catch_stderr=True)
except Exception as e:
stderr = str(e)
if stderr:
self.log_error('failed to dump stdout of {0}:\n{1}'.format(cmd, stderr))
if stdout is not None:
self.add_file(arcname, data_bytes=utils.bstr(stdout, encoding='utf-8'))
def add_file(self, arcname, src_path=None, data_bytes=None, skip_limit_check=False):
# type: (str, Optional[str], Optional[bytes], bool) -> None
if src_path is None and data_bytes is None:
raise ValueError('No src_path or data_bytes provided')
tar = self._ensure_tar_created()
entry_size = 0
if src_path is not None:
if not os.path.exists(src_path):
self.log_error('file not found: {0}'.format(src_path))
return
entry_size = os.path.getsize(src_path)
else:
entry_size = len(data_bytes) # type: ignore[arg-type]
if not self._check_required_space(entry_size):
self.log_error('no available space to store: {0}'.format(arcname))
return
if not skip_limit_check and not self._check_total_payload_limit(entry_size):
self.log_error('skip due to total payload size limit: {0}'.format(arcname))
return
try:
if src_path:
tar.add(src_path, arcname=arcname)
else:
info = tarfile.TarInfo(arcname)
info.size = entry_size
tar.addfile(info, io.BytesIO(data_bytes)) # type: ignore[arg-type]
self._total_payload_size += entry_size
except Exception as e:
self.log_error('failed to store {0}: {1}'.format(arcname, e))
def add_json(self, arcname, data):
# type: (str, Dict[str, Any]) -> None
try:
data_bytes = utils.bstr(json.dumps(data, indent=4), encoding='utf-8')
except TypeError as e:
self.log_error('failed to dump {0}:\n{1}'.format(arcname, e))
return
self.add_file(arcname, data_bytes=data_bytes)
def _check_required_space(self, entry_size):
# type: (int) -> bool
# here we simplify the check and ignore that the compressed file size will be less
statvfs = os.statvfs(self.archive_path)
return statvfs.f_frsize * statvfs.f_bfree > entry_size
def _check_total_payload_limit(self, entry_size):
# type: (int) -> bool
# here we simplify the check and ignore that the compressed file size will be less
return config.KERNEL_ANOMALY_REPORT_MAX_SIZE_BYTES > self._total_payload_size + entry_size
def log_error(self, error_msg):
# type: (str) -> None
error_msg = error_msg.strip()
log_utils.logerror(error_msg, print_msg=False)
self._errors_buffer.append(error_msg)
def __enter__(self):
# type: () -> Self
for compression_mode in ('w:xz', 'w:bz2', 'w:gz'): # pragma: no branch
tmpfile = NamedTemporaryFile(suffix='.tar.{0}'.format(compression_mode[2:]), delete=False)
tmpfile.close()
try:
log_utils.loginfo('Creating DataPackage: {0}'.format(tmpfile.name), print_msg=False)
self._tar = tarfile.open(name=tmpfile.name, mode=compression_mode)
return self
except Exception as err: # pragma: no cover
if os.path.exists(tmpfile.name):
os.unlink(tmpfile.name)
self._tar = None
if not isinstance(err, tarfile.CompressionError):
raise
raise tarfile.CompressionError('No supported compression method found') # pragma: no cover
def __exit__(self, exc_type, exc_val, exc_tb):
# type: (Optional[type[BaseException]], Optional[BaseException], Any) -> bool
if self._errors_buffer: # pragma: no branch
errors = '\n'.join(self._errors_buffer) + '\n'
self.add_file('errors.log', data_bytes=utils.bstr(errors), skip_limit_check=True)
if self._tar: # pragma: no branch
self._tar.close()
if exc_val:
self.remove_archive()
return False
return True
@utils.catch_errors(logger=log_utils.logwarn)
def remove_archive(self):
# type: () -> None
if self._tar and os.path.exists(self.archive_path): # pragma: no branch
os.unlink(self.archive_path)
def _ensure_tar_created(self):
# type: () -> tarfile.TarFile
if not self._tar:
raise RuntimeError('DataPackage should be used as a context manager')
return self._tar
@utils.catch_errors(logger=log_utils.logwarn)
def send_data_package(data_package):
# type: (DataPackage) -> str
"""Send the DataPackage archive to the patch server.
:param data_package: DataPackage instance to send
:return: Upload name (package identifier)
"""
# Generate a unique package name
# Use find('.') to get extension from first dot to preserve .tar.xz/.tar.bz2/.tar.gz
basename = os.path.basename(data_package.archive_path)
ext = basename[basename.find('.') :] if '.' in basename else ''
upload_name = str(uuid.uuid4()) + ext
upload_url = ipv6_support.get_patch_server() + '/upload/kernel-anomaly/' + upload_name
http_utils.upload_file(
data_package.archive_path,
upload_url=upload_url,
auth_string=auth.get_http_auth_string(),
)
return upload_name
def make_manifest_json(): # pragma: no cover
# type: () -> Dict[str, Any]
return {
"schema_version": 1,
"type": "kernel-anomaly",
"time_created": int(time.time()),
}
def copy_recent_files(files, data_package, archive_prefix):
# type: (List[str], DataPackage, str) -> None
"""adds recent files for the last hour to the given data package starting from the newest one"""
now = time.time()
for path, ctime in kcare.sort_files_by_ctime(files):
if now - ctime > 3600:
break
arcname = '{0}/{1}'.format(archive_prefix, path.replace('/', '_'))
data_package.add_file(arcname, src_path=path)
@utils.catch_errors(logger=log_utils.logwarn)
def prepare_kernel_anomaly_report(server_info): # pragma: no cover
# type: (Dict[str, Any]) -> DataPackage
data_package = DataPackage()
with data_package:
if os.path.exists('/var/log/messages'):
data_package.add_stdout('messages', 'tail -n10000 /var/log/messages')
if os.path.exists('/var/log/syslog'):
data_package.add_stdout('syslog', 'tail -n10000 /var/log/syslog')
data_package.add_stdout('kcarectl.log', 'tail -n10000 /var/log/kcarectl.log')
data_package.add_stdout('dmesg', 'dmesg')
data_package.add_stdout('ls_var_cache_kcare', 'ls -lR /var/cache/kcare/')
if os.path.exists('/usr/bin/rpm') or os.path.exists('/bin/rpm'):
packages_cmd = r'rpm -q -a --queryformat="%{N}|%{V}-%{R}|%{arch}|%{INSTALLTIME:date}\n"'
elif os.path.exists('/usr/bin/dpkg'):
packages_cmd = r'/usr/bin/dpkg-query -W -f "${binary:Package}|${Version}|${Architecture}\n"'
data_package.add_file('dpkg.log', src_path='/var/log/dpkg.log')
else:
packages_cmd = 'echo "unknown package manager"'
data_package.add_stdout('packages.list', packages_cmd)
# tar fails to add files from /proc directly so we first read them to memory
with open('/proc/version') as f:
data_package.add_file('proc_version', data_bytes=utils.bstr(f.read()))
with open('/proc/modules') as f:
data_package.add_file('proc_modules', data_bytes=utils.bstr(f.read()))
data_package.add_file('kcare.conf', src_path='/etc/sysconfig/kcare/kcare.conf')
data_package.add_json('server_info.json', server_info)
data_package.add_json('manifest.json', make_manifest_json())
# kdump
data_package.add_file('kdump.conf', src_path='/etc/kdump.conf')
data_package.add_stdout('ls_kdump', 'ls -lR {0}'.format(kcare.get_kdump_root()))
try:
copy_recent_files(kcare.list_kdump_txt_files(), data_package, 'kdump')
except Exception as e:
data_package.log_error('failed to copy kdumps:\n{0}'.format(e))
try:
copy_recent_files(kcare.list_crashreporter_log_files(), data_package, 'crashreporter')
except Exception as e:
data_package.log_error('failed to copy crashreporter artifacts:\n{0}'.format(e))
return data_package
@utils.catch_errors(logger=log_utils.logwarn, default_return=False)
def detect_anomaly(server_info):
# type: (Dict[str, Any]) -> bool
"""taken from eportal - anomalies::detect_agent_reboot"""
reason = server_info['reason'] # type: str
uptime = int(server_info['uptime'])
patch_level = int(server_info.get('patch_level') or '-1')
last_stop = int(server_info['last_stop'])
ts = server_info['ts'] # type: int
try:
state_ts = int(server_info['state']['ts'])
except (KeyError, TypeError, ValueError):
state_ts = 0
fields = [reason, uptime, patch_level, state_ts, last_stop, ts]
if not all(fields):
return False
first_update_after_reboot_marker = False
crash_soon_after_update_marker = False
no_proper_shutdown_marker = False
if uptime < 300 and patch_level == -1:
first_update_after_reboot_marker = True
if (ts - uptime) > state_ts > (ts - uptime - 1800) and reason == 'update':
crash_soon_after_update_marker = True
if last_stop < state_ts: # pragma: no branch
no_proper_shutdown_marker = True
markers = [
first_update_after_reboot_marker,
crash_soon_after_update_marker,
no_proper_shutdown_marker,
]
if all(markers):
log_utils.loginfo('Agent anomaly detected: {0}'.format(server_info))
return True
return False
if __name__ == '__main__': # pragma: no cover
prepare_kernel_anomaly_report({})