sp-hydra-veil-core/core/models/policy/PrivilegePolicy.py

110 lines
3.6 KiB
Python

from core.Constants import Constants
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError
from core.models.BasePolicy import BasePolicy
from packaging import version
from packaging.version import InvalidVersion
from subprocess import CalledProcessError
import os
import pwd
import re
import shutil
import subprocess
class PrivilegePolicy(BasePolicy):
def preview(self):
username = self.__determine_username()
return self.__generate(username)
def instate(self):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
if not self.__is_compatible():
raise PolicyInstatementError('The privilege policy is not compatible.')
username = self.__determine_username()
privilege_policy = self.__generate(username)
completed_successfully = False
failed_attempt_count = 0
while not completed_successfully and failed_attempt_count < 3:
process = subprocess.Popen((
'pkexec', 'install', '/dev/stdin', Constants.HV_PRIVILEGE_POLICY_PATH, '-o', 'root', '-m', '440'
), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
process.communicate(f'{privilege_policy}\n')
completed_successfully = (process.returncode == 0)
if not completed_successfully:
failed_attempt_count += 1
if not completed_successfully:
raise PolicyInstatementError('The privilege policy could not be instated.')
def revoke(self):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
process = subprocess.Popen(('pkexec', 'rm', Constants.HV_PRIVILEGE_POLICY_PATH))
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not completed_successfully:
raise PolicyRevocationError('The privilege policy could not be revoked.')
def is_suggestible(self):
return self.__is_compatible()
def is_instated(self):
return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH)
@staticmethod
def __determine_username():
try:
password_database_entry = pwd.getpwuid(os.geteuid())
except (OSError, KeyError):
raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.')
if password_database_entry.pw_uid == 0:
raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.')
return password_database_entry.pw_name
@staticmethod
def __generate(username: str):
return '\n'.join((
f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$',
))
@staticmethod
def __is_compatible():
try:
process_output = subprocess.check_output(('sudo', '-V'), text=True)
except (CalledProcessError, FileNotFoundError):
return False
if process_output.splitlines():
sudo_version_details = process_output.splitlines()[0].strip()
else:
return False
sudo_version_number = (m := re.search(r'(\d[0-9.]+?)(?=p|$)', sudo_version_details)) and m.group(1)
if not sudo_version_number:
return False
try:
sudo_version = version.parse(sudo_version_number)
except InvalidVersion:
return False
return sudo_version >= version.parse('1.9.10') and os.path.isfile('/usr/bin/wg-quick')