Skip to content

Commit

Permalink
feat: enhance phone number and API key redaction logic; improve regex…
Browse files Browse the repository at this point in the history
… patterns and add unique mapping functionality
  • Loading branch information
Hiran committed Jan 2, 2025
1 parent 1e9aa70 commit 383d76e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 84 deletions.
30 changes: 15 additions & 15 deletions log_redactor/redactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,23 @@ class Redactor:
"ipv6": re.compile(r"\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:){1,7}:|::(?:[0-9a-fA-F]{1,4}:){0,6}[0-9a-fA-F]{1,4}\b"),
"hostname": re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b"),
"phone": re.compile(
r"(?<![\w:])" # Negative lookbehind for word char or colon
r"(?<!\d)" # Negative lookbehind for digit
r"(?:"
r"\(\d{3}\) \d{3}-\d{4}|" # (XXX) XXX-XXXX
r"\d{3}-\d{3}-\d{4}|" # XXX-XXX-XXXX
r"\+1 \d{3}-\d{3}-\d{4}|" # +1 XXX-XXX-XXXX
r"\+1 \d{3} \d{3} \d{4}|" # +1 XXX XXX XXXX
r"\d{3} \d{3} \d{4}" # XXX XXX XXXX
r")\b"
r"(?![\w:])" # Negative lookahead for word char or colon
r"\(\d{3}\)\s?\d{3}-\d{4}|" # (XXX)XXX-XXXX or (XXX) XXX-XXXX
r"\+?1[\s-]?\d{3}[\s-]\d{3}[\s-]\d{4}|" # +1 XXX-XXX-XXXX or +1 XXX XXX XXXX
r"\d{3}[-\s]\d{3}[-\s]\d{4}" # XXX-XXX-XXXX or XXX XXX XXXX
r")"
r"(?!\d)" # Negative lookahead for digit
),
"email": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"),
"url": re.compile(r"https?://[^\s/$.?#].[^\s]*"),
"api_key": re.compile(r"\b(?:apikey|token|key|apitoken)=\w+\b")
}

VALID_PHONE_PATTERNS: ClassVar[list] = [
re.compile(r"^\(\d{3}\) \d{3}-\d{4}$"), # (XXX) XXX-XXXX
re.compile(r"^\d{3}-\d{3}-\d{4}$"), # XXX-XXX-XXXX
re.compile(r"^\+1 \d{3}-\d{3}-\d{4}$"), # +1 XXX-XXX-XXXX
re.compile(r"^\+1 \d{3} \d{3} \d{4}$"), # +1 XXX XXX XXXX
re.compile(r"^\d{3} \d{3} \d{4}$") # XXX XXX XXXX
re.compile(r"^\(\d{3}\)\s?\d{3}-\d{4}$"), # (XXX)XXX-XXXX or (XXX) XXX-XXXX
re.compile(r"^\+?1[\s-]?\d{3}[\s-]\d{3}[\s-]\d{4}$"), # +1 XXX-XXX-XXXX or +1 XXX XXX XXXX
re.compile(r"^\d{3}[-\s]\d{3}[-\s]\d{4}$") # XXX-XXX-XXXX or XXX XXX XXXX
]

VALIDATORS: ClassVar[dict] = {
Expand Down Expand Up @@ -314,8 +310,8 @@ def _generate_unique_email(self) -> str:
def _generate_unique_phone(self) -> str:
"""Generate a unique redacted phone number."""
if self.counter['phone'] > REDACTED_PHONE_RANGE_END:
raise ValueError("No more phone numbers available in the specified range.")
phone = f"(800) 555-{str(self.counter['phone']).zfill(4)}"
self.counter['phone'] = REDACTED_PHONE_RANGE_START
phone = f"{REDACTED_PHONE_BASE}{str(self.counter['phone']).zfill(4)}"
self.counter['phone'] += 1
return phone

Expand Down Expand Up @@ -359,6 +355,10 @@ def _generate_unique_mapping(self, value: str, pattern_type: str) -> str:
self.unique_mapping[value] = self._generate_unique_phone()
elif pattern_type == "email":
self.unique_mapping[value] = self._generate_unique_email()
elif pattern_type == "url":
self.unique_mapping[value] = self._generate_unique_url(value)
elif pattern_type == "api_key":
self.unique_mapping[value] = self._generate_unique_api_key(value)
else:
self.unique_mapping[value] = self._generate_unique_hostname()

Expand Down
120 changes: 51 additions & 69 deletions tests/test_redactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ def test_config_format_precedence(tmp_path, sample_secrets_toml, sample_secrets_
def test_save_to_file(tmp_path):
samples_dir = tmp_path / "samples"
os.makedirs(samples_dir, exist_ok=True)

redactor = Redactor()

# Test saving to TOML
redactor._save_to_file("secret", "192.168.1.*", "ipv4", "toml")
toml_path = samples_dir / "secrets.toml"
assert toml_path.exists()

# Test saving to CSV
redactor._save_to_file("ignore", "127.0.0.1", "ipv4", "csv")
csv_path = samples_dir / "ignore.csv"
Expand Down Expand Up @@ -374,45 +374,32 @@ def test_redact_phone(test_sample, capsys):
"123-456-7890"
]

# Invalid phone numbers that should not be redacted
invalid_phones = [
"333.444.5555" # Invalid format
]

# Check valid phones are redacted when they appear as standalone values
# Check valid phones are redacted
for line in redacted_lines:
# Skip comment lines containing phone format examples
if "including:" in line or line.startswith("#"):
# Skip comment lines
if line.strip().startswith("#"):
continue
for phone in valid_phones:
assert phone not in line, f"Phone number {phone} was not redacted in: {line}"

# Check invalid phones are not redacted
for line in redacted_lines:
for phone in invalid_phones:
# Check each valid phone format
for phone in valid_phones:
if phone in line:
assert phone in line, f"Invalid phone {phone} was incorrectly redacted in: {line}"
assert phone not in line, f"Phone {phone} was not redacted in line: {line}"
assert "(800) 555-" in line, f"Expected redacted format not found in line: {line}"

# Verify redacted format follows (800) 555-XXXX pattern
pattern = re.compile(r'\(800\) 555-\d{4}')
redacted_phones = []
# Verify redacted format matches expectation
redacted_pattern = re.compile(r'\(800\) 555-\d{4}')
for line in redacted_lines:
if '(800) 555-' in line:
match = pattern.search(line)
if match:
redacted_phones.append(match.group(0))

# Verify we found redacted phones
assert len(redacted_phones) > 0, "No redacted phone numbers found"

# Verify each redacted phone follows pattern
for phone in redacted_phones:
assert pattern.match(phone), f"Redacted phone {phone} does not match expected format (800) 555-XXXX"

# Print redacted content for debugging
print("\nRedacted phone numbers:")
for phone in redacted_phones:
print(phone)
if "(800) 555-" in line:
matches = redacted_pattern.findall(line)
assert all(redacted_pattern.match(match) for match in matches), \
f"Invalid redacted phone format in line: {line}"

# Optional: Print redacted content for debugging
if capsys.readouterr().out:
print("\nRedacted lines:")
for line in redacted_lines:
if "(800) 555-" in line:
print(line.strip())

def test_redact_email(test_sample, capsys):
redactor = Redactor()
Expand All @@ -431,44 +418,39 @@ def test_redact_email(test_sample, capsys):

def test_redact_url(test_sample, capsys):
redactor = Redactor()
redacted_lines = redactor.redact(test_sample)

# Capture the standard output
captured = capsys.readouterr()
test_urls = [
"https://api.example.com",
"http://subdomain.example.com:8080/path",
"https://example.com/api/v1?key=value",
"ftp://files.example.com" # Invalid URL
]

# Check that URLs are redacted
for line in redacted_lines:
assert not any(url in line for url in [
"https://www.example.com", "http://example.org",
"https://example.net/path/to/resource?query=1&value=2",
"http://localhost:8080/test", "https://subdomain.example.com/path"
])
for url in test_urls:
redacted = redactor._generate_unique_mapping(url, 'api')
print(f"\nTesting URL: {url}")
print(f"Redacted as: {redacted}")
assert redacted.startswith("https://redacted.url"), f"URL not properly redacted: {url}"
assert redacted.endswith("001"), f"Expected URL to end with '001', got: {redacted}"

def test_redact_api_key(test_sample, capsys):
redactor = Redactor()
redacted_lines = redactor.redact(test_sample)

# Capture the standard output
captured = capsys.readouterr()
test_api_keys = [
"sk_live_123456789abcdef",
"pk_test_abcdef123456789",
"api_key_987654321xyz",
"bearer_token_abc123"
]

# Check that API keys are redacted
for line in redacted_lines:
assert not any(api_key in line for api_key in [
"apikey=1234567890abcdef", "token=abcdef1234567890",
"key=abcdef1234567890", "apitoken=abcdef1234567890"
])
for api_key in test_api_keys:
redacted = redactor._generate_unique_mapping(api_key, 'api')
print(f"\nTesting API key: {api_key}")
print(f"Redacted as: {redacted}")
assert redacted.startswith("redacted_api_key"), f"API key not properly redacted: {api_key}"
assert redacted.endswith("001"), f"Expected API key to end with '001', got: {redacted}"

# Check that redacted API keys follow the expected pattern
for line in redacted_lines:
if re.search(r"apikey=redacted_api_key\d+", line) is None:
print(f"Redacted API key does not match expected pattern in line: {line}")
raise AssertionError()
if re.search(r"token=redacted_api_key\d+", line) is None:
print(f"Redacted API key does not match expected pattern in line: {line}")
raise AssertionError()
if re.search(r"key=redacted_api_key\d+", line) is None:
print(f"Redacted API key does not match expected pattern in line: {line}")
raise AssertionError()
if re.search(r"apitoken=redacted_api_key\d+", line) is None:
print(f"Redacted API key does not match expected pattern in line: {line}")
raise AssertionError()
# Capture and display output
captured = capsys.readouterr()
print("\nTest output:")
print(captured.out)

0 comments on commit 383d76e

Please sign in to comment.