sp-hydra-veil-core/core/controllers/CapabilityPolicyController.py

113 lines
3.6 KiB
Python

from core.Constants import Constants
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError
from packaging import version
from packaging.version import InvalidVersion
from subprocess import CalledProcessError
import os
import re
import shutil
import subprocess
class CapabilityPolicyController:
@staticmethod
def preview():
return CapabilityPolicyController.__generate()
@staticmethod
def instate():
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
if shutil.which('sh') is None:
raise CommandNotFoundError('sh')
if shutil.which('service') is None:
raise CommandNotFoundError('service')
if not CapabilityPolicyController.__is_compatible():
raise PolicyInstatementError('The capability policy is not compatible.')
capability_policy = CapabilityPolicyController.__generate()
completed_successfully = False
failed_attempt_count = 0
while not completed_successfully and failed_attempt_count < 3:
process = subprocess.Popen([
'pkexec', 'sh', '-c', f'install /dev/stdin {Constants.HV_CAPABILITY_POLICY_PATH} -o root -m 644 && service apparmor reload'
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
process.communicate(f'{capability_policy}\n')
completed_successfully = (process.returncode == 0)
if not completed_successfully:
failed_attempt_count += 1
if not completed_successfully:
raise PolicyInstatementError('The capability policy could not be instated.')
@staticmethod
def revoke():
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
if shutil.which('sh') is None:
raise CommandNotFoundError('sh')
if shutil.which('service') is None:
raise CommandNotFoundError('service')
process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload'))
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not completed_successfully:
raise PolicyRevocationError('The capability policy could not be revoked.')
@staticmethod
def is_instated():
return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH)
@staticmethod
def __generate():
return '\n'.join([
'abi <abi/4.0>,',
'include <tunables/global>',
'',
'profile hv-bwrap /usr/bin/bwrap flags=(unconfined) {',
' userns,',
'',
' # Site-specific additions and overrides. See local/README for details.',
' include if exists <local/hv-bwrap-userns-restrict>',
'}'
])
@staticmethod
def __is_compatible():
try:
process_output = subprocess.check_output(['apparmor_parser', '-V'], text=True)
except (CalledProcessError, FileNotFoundError):
return False
if process_output.splitlines():
apparmor_version_details = process_output.splitlines()[0].strip()
else:
return False
apparmor_version_number = (m := re.search(r'(\d[0-9.]+?)(?=[p~+-]|$)', apparmor_version_details)) and m.group(1)
if not apparmor_version_number:
return False
try:
apparmor_version = version.parse(apparmor_version_number)
except InvalidVersion:
return False
return apparmor_version >= version.parse('4.0.0') and os.path.isdir('/etc/apparmor.d')