Skip to content

Commit

Permalink
refactor a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
flashnuke committed Oct 5, 2024
1 parent c38285a commit 481f6bb
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 50 deletions.
16 changes: 13 additions & 3 deletions misc/helper_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,19 @@ def generate_cmd_dict_msg(description, commands: dict) -> str:
return f"```{table}```"


def generate_proc_dict_msg(description, processes: list) -> List[str]:
def generate_machine_stats_msg(description, cpu_usage, memory_info, disk_usage) -> str:
header = f"{description}\n| Resource | Usage |\n"
separator = "|------------|---------------------------|\n"

table = header + separator
table += f"| {'CPU':<10} | {f'{cpu_usage}%':<25} |\n"
table += f"| {'Memory':<10} | {f'{memory_info.percent}% ({memory_info.used / (1024 ** 3):.1f}/{memory_info.total / (1024 ** 3):.1f} GB)':<25} |\n"
table += f"| {'Disk':<10} | {f'{disk_usage.percent}% ({disk_usage.used / (1024 ** 3):.1f}/{disk_usage.total / (1024 ** 3):.1f} GB)':<25} |\n"

return f"```{table}```"


def generate_proc_stats_msg(description, processes: list) -> List[str]:
table_header = f"{description}\n| PID | Name | CPU (%) | Mem (%) |\n"
separator = "|-------|----------------------|---------|----------|\n"
table = table_header + separator
Expand Down Expand Up @@ -75,5 +87,3 @@ def load_config(conf_path: Path) -> Dict[str, Any]:
except json.JSONDecodeError as e:
print_error(f"Error decoding config -> {conf_path}.")
raise e


6 changes: 6 additions & 0 deletions misc/output_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ def print_cmd(text):
TG_BANNER = """
█▀ █▄█ █▀ ▀█▀ ▄▀█ █▀▄▀█ █▀▀ █▀█
▄█ ░█░ ▄█ ░█░ █▀█ █░▀░█ ██▄ █▀▄
"""

START_INTRO = """
\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-
by [@flashnuke](https://github.com/flashnuke/SysTamer)
\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\n
"""
104 changes: 57 additions & 47 deletions systamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
# Ⓒ by https://github.com/flashnuke Ⓒ................................................................................
# --------------------------------------------------------------------------------------------------------------------

def require_authentication(func, *args, **kwargs):
# ============= WRAPPERS =============

def require_authentication(func, *_args, **_kwargs):
async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
if context.user_data.get("authenticated", False) or not SysTamer.should_authenticate():
# User is authenticated, proceed with the function
Expand All @@ -48,7 +50,7 @@ async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args,
return _impl


def log_action(func, *args, **kwargs):
def log_action(func, *_args, **_kwargs):
async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
message_text = update.message.text # This will contain the full command, e.g., "/start arg1 arg2"
parts = message_text.split()
Expand All @@ -61,6 +63,18 @@ async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args,
return _impl


def check_for_permission(func, *_args, **_kwargs):
async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
try:
return await func(self, update, context, *args, **kwargs)
except PermissionError:
if update.effective_message:
await update.effective_message.reply_text("No permissions for this action, try running as superuser.")
if update.callback_query:
await SysTamer.delete_message(update, context)
return _impl


class SysTamer:
_BROWSE_IGNORE_PATH = ".browseignore"
_PASSWORD = str()
Expand All @@ -86,6 +100,17 @@ def __init__(self, json_conf: dict):
self._browse_path_dict = dict()
self._ignored_paths = SysTamer.load_ignore_paths()

# ============= static method helpers =============

@staticmethod
def build_navigate_keyboard(all_buttons: List[InlineKeyboardButton]) -> List[List[InlineKeyboardButton]]:
# separate navigation buttons from path buttons...
navigation_buttons = all_buttons[-1] if isinstance(all_buttons[-1], list) else [all_buttons[-1]]
regular_buttons = all_buttons[:-1] if isinstance(all_buttons[-1], list) else all_buttons
keyboard = [regular_buttons[i:i + 2] for i in range(0, len(regular_buttons), 2)]
keyboard.append(navigation_buttons)
return keyboard

@staticmethod
async def delete_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
Expand All @@ -96,17 +121,6 @@ async def delete_message(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
except telegram.error.BadRequest as e:
print_error(f"Error deleting message: {e}")

def check_for_permission(func, *args, **kwargs):
async def _impl(self, update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
try:
return await func(self, update, context, *args, **kwargs)
except PermissionError:
if update.effective_message:
await update.effective_message.reply_text("No permissions for this action, try running as superuser.")
if update.callback_query:
await SysTamer.delete_message(update, context)
return _impl

@staticmethod
def get_update_username(update: Update) -> str:
return update.effective_user.username if update.effective_user.username else update.effective_user.id
Expand Down Expand Up @@ -171,7 +185,7 @@ def deauthenticate(context: ContextTypes.DEFAULT_TYPE):

@log_action
@require_authentication
async def send_screenshot(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
async def send_screenshot(self, update: Update, _context: ContextTypes.DEFAULT_TYPE):
with mss.mss() as sct:
screenshot = sct.grab(sct.monitors[0]) # Capture the full screen

Expand All @@ -191,13 +205,13 @@ async def reply_with_timeout(self, update: Update, async_reply_ptr: Callable[...
try:
await async_reply_ptr(*args, write_timeout=self._timeout_duration,
connect_timeout=self._timeout_duration, read_timeout=self._timeout_duration, **kwargs)
except telegram.error.TimedOut as exc:
except telegram.error.TimedOut as _exc:
await update.message.reply_text(f"Request timed out after {self._timeout_duration} seconds.")
except telegram.error.NetworkError as exc:
await update.message.reply_text(f"Network error occurred: {exc}. Please try again later.")

@require_authentication
async def handle_file_upload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
async def handle_file_upload(self, update: Update, _context: ContextTypes.DEFAULT_TYPE):
if not os.path.exists(self._uploads_dir):
os.makedirs(self._uploads_dir)
file_path = str()
Expand Down Expand Up @@ -255,7 +269,7 @@ async def handle_file_upload(self, update: Update, context: ContextTypes.DEFAULT

@log_action
@require_authentication
async def list_uploads(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
async def list_uploads(self, update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None:
try:
if not os.path.exists(self._uploads_dir):
await update.message.reply_text("Upload directory does not exist.")
Expand Down Expand Up @@ -287,18 +301,13 @@ async def list_uploads(self, update: Update, context: ContextTypes.DEFAULT_TYPE)

@log_action
@require_authentication
async def system_resource_monitoring(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
async def system_resource_monitoring(self, update: Update, _context: ContextTypes.DEFAULT_TYPE):
cpu_usage = psutil.cpu_percent(interval=1)
memory_info = psutil.virtual_memory()
disk_usage = psutil.disk_usage('/')

response = (
f"CPU Usage: {cpu_usage}%\n"
f"Memory Usage: {memory_info.percent}% ({memory_info.used / (1024 ** 3):.2f} GB used of {memory_info.total / (1024 ** 3):.2f} GB)\n"
f"Disk Usage: {disk_usage.percent}% ({disk_usage.used / (1024 ** 3):.2f} GB used of {disk_usage.total / (1024 ** 3):.2f} GB)"
)

await update.message.reply_text(response)
await update.message.reply_text(generate_machine_stats_msg("MachineStats", cpu_usage, memory_info, disk_usage),
parse_mode="MarkdownV2")

@log_action
@require_authentication
Expand All @@ -319,7 +328,8 @@ async def list_processes(self, update: Update, context: ContextTypes.DEFAULT_TYP
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue

table_chunks = generate_proc_dict_msg(f"Processes:{len(processes)},Filters:{context.args if context.args else None}", processes)
table_chunks = generate_proc_stats_msg(f"Processes:{len(processes)},"
f"Filters:{context.args if context.args else None}", processes)
for chunk in table_chunks:
await update.message.reply_text(f"```{chunk}```", parse_mode="MarkdownV2")

Expand All @@ -339,16 +349,13 @@ async def kill_process(self, update: Update, context: ContextTypes.DEFAULT_TYPE)
await update.message.reply_text("Invalid process ID or process does not exist.")

@log_action
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
welcome_message = TG_BANNER + "\n\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\n" \
"by [@flashnuke](https://github.com/flashnuke/SysTamer)" \
"\n\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\n"\
+ generate_cmd_dict_msg("Commands", COMMANDS_DICT)
async def start(self, update: Update, _context: ContextTypes.DEFAULT_TYPE):
welcome_message = TG_BANNER + START_INTRO + generate_cmd_dict_msg("Commands", COMMANDS_DICT)
await update.message.reply_text(welcome_message, parse_mode='MarkdownV2', disable_web_page_preview=True)

@log_action
@require_authentication
async def upload_info(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
async def upload_info(self, update: Update, _context: ContextTypes.DEFAULT_TYPE):
upload_message = (
f"Simply send a file, and it will be saved to -> {self._uploads_dir}"
)
Expand All @@ -367,29 +374,29 @@ def list_files_and_directories(self, path: str):
entry_hashed = hashlib.md5(full_path.encode()).hexdigest()
self._browse_path_dict[entry_hashed] = full_path

# Ensure that it's a directory or file and not something like NTUSER.DAT
if os.path.isdir(full_path):
buttons.append(InlineKeyboardButton(entry + '/', callback_data=f"cd {entry_hashed}"))
else:
buttons.append(InlineKeyboardButton(entry, callback_data=f"file {entry_hashed}"))

# Add "Back" button to navigate to the parent directory if not at the root
# if path != str(Path.home()): # If we are not in the home directory
parent_directory = os.path.dirname(path) # Get parent directory
if os.path.isdir(parent_directory): # Ensure the parent directory is valid
parent_directory = os.path.dirname(path)
if os.path.isdir(parent_directory):
parent_hashed = hashlib.md5(parent_directory.encode()).hexdigest()
self._browse_path_dict[parent_hashed] = parent_directory
buttons.append(InlineKeyboardButton("⬅️ Back", callback_data=f"cd {parent_hashed}"))
buttons.append(InlineKeyboardButton("❌️ Close", callback_data=f"action close"))
buttons.append([InlineKeyboardButton("⬅️ Back", callback_data=f"cd {parent_hashed}"),
InlineKeyboardButton("❌️ Close", callback_data=f"action close")])
else:
buttons.append([InlineKeyboardButton("❌️ Close", callback_data=f"action close")])
return buttons

@log_action
@check_for_permission
@require_authentication
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
path = str(Path.home()) # Start from the home directory
buttons = self.list_files_and_directories(path)
keyboard = [buttons[i:i + 2] for i in range(0, len(buttons), 2)] # Group buttons in rows
async def browse(self, update: Update, _context: ContextTypes.DEFAULT_TYPE):
path = str(Path.home())
all_buttons = self.list_files_and_directories(path)
keyboard = self.build_navigate_keyboard(all_buttons)

reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text('Choose a directory or file:', reply_markup=reply_markup)

Expand All @@ -399,15 +406,18 @@ async def handle_navigation(self, update: Update, context: ContextTypes.DEFAULT_
query = update.callback_query
data = query.data.split(' ', 1)
command = data[0] # The command is the first part (e.g., "cd", "file", "action")
print_cmd(f"user {SysTamer.get_update_username(update)}\t|\thandle_navigation received cmd -> {' '.join(data)}" + ('\t|\t(' + self._browse_path_dict.get(data[1]) + ')' if len(data) >= 1 and data[1] in self._browse_path_dict else ''))
print_cmd(f"user {SysTamer.get_update_username(update)}\t|\t"
f"handle_navigation received cmd -> {' '.join(data)}" +
('\t|\t(' + self._browse_path_dict.get(data[1]) + ')'
if len(data) >= 1 and data[1] in self._browse_path_dict else ''))

if command == "cd": # Handle directory navigation
hashed_path = data[1]
path = self._browse_path_dict.get(hashed_path)

if path and os.path.isdir(path): # Ensure the path is a valid directory
buttons = self.list_files_and_directories(path)
keyboard = [buttons[i:i + 2] for i in range(0, len(buttons), 2)]
all_buttons = self.list_files_and_directories(path)
keyboard = self.build_navigate_keyboard(all_buttons)
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text=f'Navigating to: {path}', reply_markup=reply_markup)
else:
Expand Down Expand Up @@ -496,7 +506,7 @@ def _build_app(self) -> telegram.ext.Application:

return application

def _error_handler(self, update: object, context: telegram.ext.CallbackContext):
def _error_handler(self, _update: object, context: telegram.ext.CallbackContext):
try:
raise context.error
except Exception as exc:
Expand Down

0 comments on commit 481f6bb

Please sign in to comment.