Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Linting #205

Merged
merged 7 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pylint aiohttp pytest_httpserver pytest_asyncio flask
pip install pytest pylint aiohttp aiofiles pytest_httpserver pytest_asyncio flask
- name: Lint
run: |
pylint --rcfile pylintrc vt/ tests examples/
Expand Down
36 changes: 21 additions & 15 deletions examples/hunting_notifications_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,27 @@ async def get_network_infrastructure(self):
contacted_domains = relationships["contacted_domains"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
await self.queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
})
await self.queue.put({
"contacted_addresses": contacted_ips,
"type": "ips",
"file": file_hash,
})
await self.queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
})
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": file_hash,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
}
)
self.networking_infrastructure[file_hash]["domains"] = contacted_domains
self.networking_infrastructure[file_hash]["ips"] = contacted_ips
self.networking_infrastructure[file_hash]["urls"] = contacted_urls
Expand Down
36 changes: 21 additions & 15 deletions examples/intelligence_search_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,27 @@ async def get_network(self):
contacted_urls = relationships["contacted_urls"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]

await self.queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
})
await self.queue.put({
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
})
await self.queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
})
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
}
)

self.networking_infrastructure[checksum]["domains"] = contacted_domains
self.networking_infrastructure[checksum]["ips"] = contacted_ips
Expand Down
9 changes: 5 additions & 4 deletions examples/livehunt_network_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
RULESET_LINK = "https://www.virustotal.com/yara-editor/livehunt/"

EMPTY_DOMAIN_LIST_MSG = (
"* Empty domain list, use --add-domain domain.tld or bulk operations to"
" register them"
"* Empty domain list, use --add-domain domain.tld or bulk operations to"
" register them"
)


Expand Down Expand Up @@ -247,8 +247,9 @@ async def main():
return

rulesets = await get_rulesets()
if (not rulesets and
not (args.add_domain or args.bulk_append or args.bulk_replace)):
if not rulesets and not (
args.add_domain or args.bulk_append or args.bulk_replace
):
print(EMPTY_DOMAIN_LIST_MSG)
sys.exit(1)

Expand Down
127 changes: 59 additions & 68 deletions examples/private_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,77 +13,68 @@

console = Console()


async def scan_file_private(
api_key: str,
file_path: Path,
wait: bool = False
api_key: str, file_path: Path, wait: bool = False
) -> None:
"""
Scan a file privately on VirusTotal.

Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task(
"Scanning file...",
total=None if wait else 1
)

analysis = await client.scan_file_private_async(
str(file_path),
wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, 'stats'):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
"""
Scan a file privately on VirusTotal.

Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task("Scanning file...", total=None if wait else 1)

analysis = await client.scan_file_private_async(
str(file_path), wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, "stats"):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e: # pylint: disable=broad-exception-caught
console.print(f"[red]Error: {e}[/red]")


def main():
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait",
action="store_true",
help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(
args.apikey,
file_path,
args.wait
))
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait", action="store_true", help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(args.apikey, file_path, args.wait))


if __name__ == "__main__":
main()
main()
24 changes: 14 additions & 10 deletions examples/retrohunt_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,23 @@ async def get_network_infrastructure(self, file_obj):
contacted_domains = relationships["contacted_domains"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
await self.networking_queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
})
await self.networking_queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
}
)
await self.networking_queue.put(
{"contacted_addresses": contacted_ips, "type": "ips", "file": file_hash}
)
await self.networking_queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
})
await self.networking_queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
}
)
self.networking_infrastructure[file_hash]["domains"] = contacted_domains
self.networking_infrastructure[file_hash]["ips"] = contacted_ips
self.networking_infrastructure[file_hash]["urls"] = contacted_urls
Expand Down
1 change: 1 addition & 0 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ disable=abstract-method,
wrong-import-order,
xrange-builtin,
zip-builtin-not-iterating,
too-many-positional-arguments,


[REPORTS]
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
python_requires=">=3.7.0",
install_requires=[
"aiohttp==3.8.6 ; python_version=='3.7'",
"aiohttp ; python_version>'3.7'"
"aiohttp ; python_version>'3.7'",
"aiofiles"
],
setup_requires=["pytest-runner"],
extras_require={
Expand Down
Loading