Skip to content
/ hotsos Public
forked from canonical/hotsos

Commit

Permalink
Add a new apparmor helper
Browse files Browse the repository at this point in the history
Resolves: canonical#750
  • Loading branch information
dosaboy committed Oct 17, 2023
1 parent 9ea0f9a commit 3be0fed
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 0 deletions.
4 changes: 4 additions & 0 deletions hotsos/core/host_helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@
SYSCtlFactory,
SYSCtlConfHelper,
)
from .apparmor import ( # noqa: F403,F401
AAProfileFactory,
ApparmorHelper,
)
78 changes: 78 additions & 0 deletions hotsos/core/host_helpers/apparmor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from functools import cached_property

# NOTE: we import direct from searchkit rather than hotsos.core.search to
# avoid circular dependency issues.
from searchkit import (
FileSearcher,
SearchDef,
SequenceSearchDef,
)
from hotsos.core.factory import FactoryBase
from hotsos.core.host_helpers.cli import CLIHelperFile


class AAProfile(object):

def __init__(self, name):
self.name = name
self.mode = ApparmorHelper().get_profile_mode(name)


class ApparmorHelper(object):

@cached_property
def profiles(self):
"""
Fetch all profiles and their mode from apparmor_status.
@return: dictionary of {<mode>: {'profiles': <list>, 'count': <int>}}
"""
s = FileSearcher()
seqdef = SequenceSearchDef(
start=SearchDef(r"(\d+) (\S+) are in (\S+) mode."),
body=SearchDef(r"\s+(\S+)"),
tag="aastatus")
info = {}
with CLIHelperFile() as cli:
s.add(seqdef, path=cli.apparmor_status())
results = s.run()
for section in results.find_sequence_sections(seqdef).values():
count = 0
mode = None
is_profiles = False
for result in section:
if result.tag == seqdef.start_tag:
count = int(result.get(1))
is_profiles = result.get(2) == 'profiles'
mode = result.get(3)
if mode not in info:
info[mode] = {'profiles': [], 'count': count}
elif result.tag == seqdef.body_tag:
if not is_profiles or count == 0:
continue

info[mode]['profiles'].append(result.get(1))

return info

def get_profile_mode(self, name):
for mode, profiles in self.profiles.items():
if name in profiles['profiles']:
return mode

@property
def profiles_enforce(self):
return self.profiles.get('enforce', {}).get('profiles', [])

@property
def profiles_complain(self):
return self.profiles.get('complain', {}).get('profiles', [])


class AAProfileFactory(FactoryBase):
"""
Dynamically create AAProfile objects using profile name.
"""

def __getattr__(self, profile):
return AAProfile(profile)
3 changes: 3 additions & 0 deletions hotsos/core/host_helpers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,9 @@ def command_catalog(self):
'apt_config_dump':
[BinCmd('apt-config dump'),
FileCmd('sos_commands/apt/apt-config_dump')],
'apparmor_status':
[BinCmd('apparmor_status'),
FileCmd('sos_commands/apparmor/apparmor_status')],
'ceph_daemon_osd_config_show':
[BinCmd('ceph daemon osd.{osd_id} config show',
json_decode=True),
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/test_host_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,25 @@ def test_sectionalconfig_base(self):
expanded = cfg.get('c-key', expand_to_list=True)
self.assertEqual(expanded, list(range(2, 9)) + list(range(10, 32)))
self.assertEqual(cfg.squash_int_range(expanded), '2-8,10-31')


class TestApparmorHelper(utils.BaseTestCase):

def test_aa_status_profiles(self):
helper = host_helpers.ApparmorHelper()
profiles = helper.profiles
self.assertEqual(len(profiles), 2)
self.assertEqual(profiles['enforce']['count'], 253)
self.assertEqual(len(profiles['enforce']['profiles']), 253)
self.assertEqual(profiles['enforce']['profiles'][-1], 'virt-aa-helper')
self.assertEqual(helper.profiles_enforce,
profiles['enforce']['profiles'])
self.assertEqual(profiles['complain']['count'], 0)
self.assertEqual(len(profiles['complain']['profiles']), 0)
self.assertEqual(helper.profiles_complain, [])

def test_aa_profile_factory(self):
profile = getattr(host_helpers.apparmor.AAProfileFactory(),
'virt-aa-helper')
self.assertEqual(profile.name, 'virt-aa-helper')
self.assertEqual(profile.mode, 'enforce')

0 comments on commit 3be0fed

Please sign in to comment.