from core.Constants import Constants from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError from core.models.BasePolicy import BasePolicy import os import pwd import shutil import subprocess class PrivilegePolicy(BasePolicy): def preview(self): username = self.__determine_username() return self.__generate(username) def instate(self): pass def revoke(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') process = subprocess.Popen(('pkexec', 'rm', Constants.HV_PRIVILEGE_POLICY_PATH)) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) if not completed_successfully: raise PolicyRevocationError('The privilege policy could not be revoked.') def is_suggestible(self): return self.__is_compatible() def is_instated(self): return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH) @staticmethod def __determine_username(): try: password_database_entry = pwd.getpwuid(os.geteuid()) except (OSError, KeyError): raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.') if password_database_entry.pw_uid == 0: raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.') return password_database_entry.pw_name @staticmethod def __generate(username: str): return '\n'.join(( f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$', )) @staticmethod def __is_compatible(): return False