Skip to content

Commit

Permalink
feat: enhance email redaction logic; preserve domain for non-redacted…
Browse files Browse the repository at this point in the history
… emails and emprove tests
  • Loading branch information
Hiran committed Jan 3, 2025
1 parent cee4cf3 commit 4373af5
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 15 deletions.
17 changes: 15 additions & 2 deletions log_redactor/redactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,12 @@ def is_valid_api_key(key: str) -> bool:

def _generate_unique_email(self) -> str:
"""Generate a unique redacted email address."""
email = f"{REDACTED_EMAIL_BASE}{self.counter['email']:03}{REDACTED_EMAIL_DOMAIN}"
# Split input email into local and domain parts
local_part = f"{REDACTED_EMAIL_BASE}{self.counter['email']:03d}"
domain_part = REDACTED_EMAIL_DOMAIN.lstrip('@') # Remove leading @ as we'll add it back

# Combine parts
email = f"{local_part}@{domain_part}"
self.counter['email'] += 1
return email

Expand Down Expand Up @@ -430,7 +435,15 @@ def _generate_unique_mapping(self, value: str, pattern_type: str) -> str:
elif pattern_type == "phone":
redacted_value = self._generate_unique_phone()
elif pattern_type == "email":
redacted_value = self._generate_unique_email()
# Special handling for emails to preserve domain if not redacted
local_part, domain = value.split('@')
redacted_local = f"{REDACTED_EMAIL_BASE}{self.counter['email']:03d}"
# Check if domain should be redacted
if self.should_redact_value(domain, "hostname"):
domain = f"redacted_host{self.counter['hostname']:03d}"
self.counter['hostname'] += 1
redacted_value = f"{redacted_local}@{domain}"
self.counter['email'] += 1
elif pattern_type == "url":
redacted_value = self._generate_unique_url(value)
elif pattern_type == "api_key":
Expand Down
95 changes: 82 additions & 13 deletions tests/test_redactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,20 +510,89 @@ def test_redact_phone(test_phones, invalid_phones, capsys):
for original, redacted in redactor.unique_mapping.items():
print(f"{original} -> {redacted}")

def test_redact_email(test_sample, capsys):
redactor = Redactor()
redacted_lines = redactor.redact(test_sample)

# Capture the standard output
captured = capsys.readouterr()
print("Redacted Lines:\n" + "\n".join(captured))
# Check that email addresses are redacted
for line in redacted_lines:
assert not any(email in line for email in [
"john.doe@example.com", "jane.doe@example.com", "admin@example.com",
"user@example.com", "contact@example.com"
])
# def test_redact_email(test_sample, capsys):
# redactor = Redactor()
# redacted_lines = redactor.redact(test_sample)

# # Capture the standard output
# captured = capsys.readouterr()
# print("Redacted Lines:\n" + "\n".join(captured))
# # Check that email addresses are redacted
# for line in redacted_lines:
# assert not any(email in line for email in [
# "john.doe@example.com", "jane.doe@example.com", "admin@example.com",
# "user@example.com", "contact@example.com"
# ])

@pytest.fixture
def test_email_values():
return {
'email': [
('john.doe@example.com', 'redacted.user001@example.com'), # Domain not redacted
('admin@test.com', 'redacted.user002@redacted_host001'), # Domain redacted
('user123@domain.co.uk', 'redacted.user003@domain.co.uk') # Domain not redacted
]
}

@pytest.fixture
def email_secrets_toml():
return """
[email]
patterns = [
"*@example.com",
"admin@*",
"*@domain.co.uk"
]
[hostname]
patterns = [
"test.com"
]
"""

def test_redact_email(test_email_values, email_secrets_toml, tmp_path, capsys):
# Set up secrets configuration
secrets_path = tmp_path / "secrets.toml"
os.makedirs(tmp_path, exist_ok=True)
secrets_path.write_text(email_secrets_toml)

# Initialize redactor with config path
redactor = Redactor(config_path=str(tmp_path))

for redact_type, test_cases in test_email_values.items():
print(f"\nTesting {redact_type} redactions:")

for original, expected in test_cases:
# Test redaction mapping
redacted = redactor._generate_unique_mapping(original, redact_type)
print(f"\nInput: {original}")
print(f"Redacted: {redacted}")
print(f"Expected: {expected}")

# Verify format
assert redacted.startswith("redacted.user"), f"Wrong prefix: {redacted}"
local_part, domain = redacted.split('@')

# Verify each part
assert local_part.startswith("redacted.user"), f"Wrong local part prefix: {local_part}"
if "test.com" in original: # Domain should be redacted
assert domain.startswith("redacted_host"), f"Domain should be redacted: {domain}"
else: # Domain should be preserved
assert domain == original.split('@')[1], f"Domain should not be redacted: {domain}"

# Test full text redaction
test_line = f"Email found: {original}"
redacted_line = redactor.redact([test_line])[0]
print(f"Original: {test_line}")
print(f"Redacted: {redacted_line}")

assert original not in redacted_line, f"Original email found in: {redacted_line}"
assert redacted in redacted_line, f"Redacted value not found in: {redacted_line}"

# Show mappings
print("\nFinal Mappings:")
for k, v in redactor.unique_mapping.items():
print(f"{k} -> {v}")

def test_redact_url(capsys):
redactor = Redactor()
Expand Down

0 comments on commit 4373af5

Please sign in to comment.