From 481f6bb9251552f2d874eb40ad7a685cb0ea0545 Mon Sep 17 00:00:00 2001 From: flashnuke Date: Sun, 6 Oct 2024 00:11:02 +0300 Subject: [PATCH] refactor a bit --- misc/helper_definitions.py | 16 ++++-- misc/output_manager.py | 6 +++ systamer.py | 104 ++++++++++++++++++++----------------- 3 files changed, 76 insertions(+), 50 deletions(-) diff --git a/misc/helper_definitions.py b/misc/helper_definitions.py index 2956286..95a3a15 100644 --- a/misc/helper_definitions.py +++ b/misc/helper_definitions.py @@ -43,7 +43,19 @@ def generate_cmd_dict_msg(description, commands: dict) -> str: return f"```{table}```" -def generate_proc_dict_msg(description, processes: list) -> List[str]: +def generate_machine_stats_msg(description, cpu_usage, memory_info, disk_usage) -> str: + header = f"{description}\n| Resource | Usage |\n" + separator = "|------------|---------------------------|\n" + + table = header + separator + table += f"| {'CPU':<10} | {f'{cpu_usage}%':<25} |\n" + table += f"| {'Memory':<10} | {f'{memory_info.percent}% ({memory_info.used / (1024 ** 3):.1f}/{memory_info.total / (1024 ** 3):.1f} GB)':<25} |\n" + table += f"| {'Disk':<10} | {f'{disk_usage.percent}% ({disk_usage.used / (1024 ** 3):.1f}/{disk_usage.total / (1024 ** 3):.1f} GB)':<25} |\n" + + return f"```{table}```" + + +def generate_proc_stats_msg(description, processes: list) -> List[str]: table_header = f"{description}\n| PID | Name | CPU (%) | Mem (%) |\n" separator = "|-------|----------------------|---------|----------|\n" table = table_header + separator @@ -75,5 +87,3 @@ def load_config(conf_path: Path) -> Dict[str, Any]: except json.JSONDecodeError as e: print_error(f"Error decoding config -> {conf_path}.") raise e - - diff --git a/misc/output_manager.py b/misc/output_manager.py index 35a6927..a1ffe39 100644 --- a/misc/output_manager.py +++ b/misc/output_manager.py @@ -59,4 +59,10 @@ def print_cmd(text): TG_BANNER = """ █▀ █▄█ █▀ ▀█▀ ▄▀█ █▀▄▀█ █▀▀ █▀█ ▄█ ░█░ ▄█ ░█░ █▀█ █░▀░█ ██▄ █▀▄ +""" + +START_INTRO = """ +\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- +by [@flashnuke](https://github.com/flashnuke/SysTamer) +\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\n """ \ No newline at end of file diff --git a/systamer.py b/systamer.py index 66029e5..9c59754 100755 --- a/systamer.py +++ b/systamer.py @@ -36,7 +36,9 @@ # Ⓒ by https://github.com/flashnuke Ⓒ................................................................................ # -------------------------------------------------------------------------------------------------------------------- -def require_authentication(func, *args, **kwargs): +# ============= WRAPPERS ============= + +def require_authentication(func, *_args, **_kwargs): async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): if context.user_data.get("authenticated", False) or not SysTamer.should_authenticate(): # User is authenticated, proceed with the function @@ -48,7 +50,7 @@ async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, return _impl -def log_action(func, *args, **kwargs): +def log_action(func, *_args, **_kwargs): async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): message_text = update.message.text # This will contain the full command, e.g., "/start arg1 arg2" parts = message_text.split() @@ -61,6 +63,18 @@ async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, return _impl +def check_for_permission(func, *_args, **_kwargs): + async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): + try: + return await func(self, update, context, *args, **kwargs) + except PermissionError: + if update.effective_message: + await update.effective_message.reply_text("No permissions for this action, try running as superuser.") + if update.callback_query: + await SysTamer.delete_message(update, context) + return _impl + + class SysTamer: _BROWSE_IGNORE_PATH = ".browseignore" _PASSWORD = str() @@ -86,6 +100,17 @@ def __init__(self, json_conf: dict): self._browse_path_dict = dict() self._ignored_paths = SysTamer.load_ignore_paths() + # ============= static method helpers ============= + + @staticmethod + def build_navigate_keyboard(all_buttons: List[InlineKeyboardButton]) -> List[List[InlineKeyboardButton]]: + # separate navigation buttons from path buttons... + navigation_buttons = all_buttons[-1] if isinstance(all_buttons[-1], list) else [all_buttons[-1]] + regular_buttons = all_buttons[:-1] if isinstance(all_buttons[-1], list) else all_buttons + keyboard = [regular_buttons[i:i + 2] for i in range(0, len(regular_buttons), 2)] + keyboard.append(navigation_buttons) + return keyboard + @staticmethod async def delete_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: try: @@ -96,17 +121,6 @@ async def delete_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> except telegram.error.BadRequest as e: print_error(f"Error deleting message: {e}") - def check_for_permission(func, *args, **kwargs): - async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): - try: - return await func(self, update, context, *args, **kwargs) - except PermissionError: - if update.effective_message: - await update.effective_message.reply_text("No permissions for this action, try running as superuser.") - if update.callback_query: - await SysTamer.delete_message(update, context) - return _impl - @staticmethod def get_update_username(update: Update) -> str: return update.effective_user.username if update.effective_user.username else update.effective_user.id @@ -171,7 +185,7 @@ def deauthenticate(context: ContextTypes.DEFAULT_TYPE): @log_action @require_authentication - async def send_screenshot(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + async def send_screenshot(self, update: Update, _context: ContextTypes.DEFAULT_TYPE): with mss.mss() as sct: screenshot = sct.grab(sct.monitors[0]) # Capture the full screen @@ -191,13 +205,13 @@ async def reply_with_timeout(self, update: Update, async_reply_ptr: Callable[... try: await async_reply_ptr(*args, write_timeout=self._timeout_duration, connect_timeout=self._timeout_duration, read_timeout=self._timeout_duration, **kwargs) - except telegram.error.TimedOut as exc: + except telegram.error.TimedOut as _exc: await update.message.reply_text(f"Request timed out after {self._timeout_duration} seconds.") except telegram.error.NetworkError as exc: await update.message.reply_text(f"Network error occurred: {exc}. Please try again later.") @require_authentication - async def handle_file_upload(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + async def handle_file_upload(self, update: Update, _context: ContextTypes.DEFAULT_TYPE): if not os.path.exists(self._uploads_dir): os.makedirs(self._uploads_dir) file_path = str() @@ -255,7 +269,7 @@ async def handle_file_upload(self, update: Update, context: ContextTypes.DEFAULT @log_action @require_authentication - async def list_uploads(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + async def list_uploads(self, update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None: try: if not os.path.exists(self._uploads_dir): await update.message.reply_text("Upload directory does not exist.") @@ -287,18 +301,13 @@ async def list_uploads(self, update: Update, context: ContextTypes.DEFAULT_TYPE) @log_action @require_authentication - async def system_resource_monitoring(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + async def system_resource_monitoring(self, update: Update, _context: ContextTypes.DEFAULT_TYPE): cpu_usage = psutil.cpu_percent(interval=1) memory_info = psutil.virtual_memory() disk_usage = psutil.disk_usage('/') - response = ( - f"CPU Usage: {cpu_usage}%\n" - f"Memory Usage: {memory_info.percent}% ({memory_info.used / (1024 ** 3):.2f} GB used of {memory_info.total / (1024 ** 3):.2f} GB)\n" - f"Disk Usage: {disk_usage.percent}% ({disk_usage.used / (1024 ** 3):.2f} GB used of {disk_usage.total / (1024 ** 3):.2f} GB)" - ) - - await update.message.reply_text(response) + await update.message.reply_text(generate_machine_stats_msg("MachineStats", cpu_usage, memory_info, disk_usage), + parse_mode="MarkdownV2") @log_action @require_authentication @@ -319,7 +328,8 @@ async def list_processes(self, update: Update, context: ContextTypes.DEFAULT_TYP except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue - table_chunks = generate_proc_dict_msg(f"Processes:{len(processes)},Filters:{context.args if context.args else None}", processes) + table_chunks = generate_proc_stats_msg(f"Processes:{len(processes)}," + f"Filters:{context.args if context.args else None}", processes) for chunk in table_chunks: await update.message.reply_text(f"```{chunk}```", parse_mode="MarkdownV2") @@ -339,16 +349,13 @@ async def kill_process(self, update: Update, context: ContextTypes.DEFAULT_TYPE) await update.message.reply_text("Invalid process ID or process does not exist.") @log_action - async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE): - welcome_message = TG_BANNER + "\n\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\n" \ - "by [@flashnuke](https://github.com/flashnuke/SysTamer)" \ - "\n\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\n"\ - + generate_cmd_dict_msg("Commands", COMMANDS_DICT) + async def start(self, update: Update, _context: ContextTypes.DEFAULT_TYPE): + welcome_message = TG_BANNER + START_INTRO + generate_cmd_dict_msg("Commands", COMMANDS_DICT) await update.message.reply_text(welcome_message, parse_mode='MarkdownV2', disable_web_page_preview=True) @log_action @require_authentication - async def upload_info(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + async def upload_info(self, update: Update, _context: ContextTypes.DEFAULT_TYPE): upload_message = ( f"Simply send a file, and it will be saved to -> {self._uploads_dir}" ) @@ -367,29 +374,29 @@ def list_files_and_directories(self, path: str): entry_hashed = hashlib.md5(full_path.encode()).hexdigest() self._browse_path_dict[entry_hashed] = full_path - # Ensure that it's a directory or file and not something like NTUSER.DAT if os.path.isdir(full_path): buttons.append(InlineKeyboardButton(entry + '/', callback_data=f"cd {entry_hashed}")) else: buttons.append(InlineKeyboardButton(entry, callback_data=f"file {entry_hashed}")) - # Add "Back" button to navigate to the parent directory if not at the root - # if path != str(Path.home()): # If we are not in the home directory - parent_directory = os.path.dirname(path) # Get parent directory - if os.path.isdir(parent_directory): # Ensure the parent directory is valid + parent_directory = os.path.dirname(path) + if os.path.isdir(parent_directory): parent_hashed = hashlib.md5(parent_directory.encode()).hexdigest() self._browse_path_dict[parent_hashed] = parent_directory - buttons.append(InlineKeyboardButton("⬅️ Back", callback_data=f"cd {parent_hashed}")) - buttons.append(InlineKeyboardButton("❌️ Close", callback_data=f"action close")) + buttons.append([InlineKeyboardButton("⬅️ Back", callback_data=f"cd {parent_hashed}"), + InlineKeyboardButton("❌️ Close", callback_data=f"action close")]) + else: + buttons.append([InlineKeyboardButton("❌️ Close", callback_data=f"action close")]) return buttons @log_action @check_for_permission @require_authentication - async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE): - path = str(Path.home()) # Start from the home directory - buttons = self.list_files_and_directories(path) - keyboard = [buttons[i:i + 2] for i in range(0, len(buttons), 2)] # Group buttons in rows + async def browse(self, update: Update, _context: ContextTypes.DEFAULT_TYPE): + path = str(Path.home()) + all_buttons = self.list_files_and_directories(path) + keyboard = self.build_navigate_keyboard(all_buttons) + reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text('Choose a directory or file:', reply_markup=reply_markup) @@ -399,15 +406,18 @@ async def handle_navigation(self, update: Update, context: ContextTypes.DEFAULT_ query = update.callback_query data = query.data.split(' ', 1) command = data[0] # The command is the first part (e.g., "cd", "file", "action") - print_cmd(f"user {SysTamer.get_update_username(update)}\t|\thandle_navigation received cmd -> {' '.join(data)}" + ('\t|\t(' + self._browse_path_dict.get(data[1]) + ')' if len(data) >= 1 and data[1] in self._browse_path_dict else '')) + print_cmd(f"user {SysTamer.get_update_username(update)}\t|\t" + f"handle_navigation received cmd -> {' '.join(data)}" + + ('\t|\t(' + self._browse_path_dict.get(data[1]) + ')' + if len(data) >= 1 and data[1] in self._browse_path_dict else '')) if command == "cd": # Handle directory navigation hashed_path = data[1] path = self._browse_path_dict.get(hashed_path) if path and os.path.isdir(path): # Ensure the path is a valid directory - buttons = self.list_files_and_directories(path) - keyboard = [buttons[i:i + 2] for i in range(0, len(buttons), 2)] + all_buttons = self.list_files_and_directories(path) + keyboard = self.build_navigate_keyboard(all_buttons) reply_markup = InlineKeyboardMarkup(keyboard) await query.edit_message_text(text=f'Navigating to: {path}', reply_markup=reply_markup) else: @@ -496,7 +506,7 @@ def _build_app(self) -> telegram.ext.Application: return application - def _error_handler(self, update: object, context: telegram.ext.CallbackContext): + def _error_handler(self, _update: object, context: telegram.ext.CallbackContext): try: raise context.error except Exception as exc: