Skip to content

Commit

Permalink
Add private scan support with code insight (#204)
Browse files Browse the repository at this point in the history
Co-authored-by: Marta Gómez Macías <mgmacias@google.com>
  • Loading branch information
zeefxd and mgmacias95 authored Dec 2, 2024
1 parent 0fd07b8 commit 75f2e90
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 0 deletions.
89 changes: 89 additions & 0 deletions examples/private_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
Tool for scanning files privately using VirusTotal API.
Supports waiting for scan completion.
"""

import sys
import asyncio
import argparse
from pathlib import Path
import vt
from rich.console import Console
from rich.progress import Progress

console = Console()

async def scan_file_private(
api_key: str,
file_path: Path,
wait: bool = False
) -> None:
"""
Scan a file privately on VirusTotal.
Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task(
"Scanning file...",
total=None if wait else 1
)

analysis = await client.scan_file_private_async(
str(file_path),
wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, 'stats'):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")

def main():
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait",
action="store_true",
help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(
args.apikey,
file_path,
args.wait
))

if __name__ == "__main__":
main()
77 changes: 77 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,3 +507,80 @@ def test_wsgi_app(httpserver, monkeypatch):
response = client.get("/")
assert response.status_code == 200
assert response.json == expected_response

@pytest.fixture
def private_scan_mocks(httpserver):
"""Fixture for mocking private scan API calls."""
upload_url = f"http://{httpserver.host}:{httpserver.port}/upload"

# Mock private upload URL request
httpserver.expect_request(
"/api/v3/private/files/upload_url",
method="GET"
).respond_with_json({
"data": upload_url
})

# Mock file upload response
httpserver.expect_request(
"/upload",
method="POST"
).respond_with_json({
"data": {
"id": "dummy_scan_id",
"type": "private_analysis",
"links": {
"self": "dummy_link"
},
"attributes": {
"status": "queued",
}
}
})

# Add mock for analysis status endpoint
httpserver.expect_request(
"/api/v3/analyses/dummy_scan_id",
method="GET"
).respond_with_json({
"data": {
"id": "dummy_scan_id",
"type": "private_analysis",
"links": {
"self": "dummy_link"
},
"attributes": {
"status": "completed",
"stats": {
"malicious": 0,
"suspicious": 0
}
}
}
})

return upload_url

def verify_analysis(analysis, status="queued"):
"""Helper to verify analysis response."""
assert analysis.id == "dummy_scan_id"
assert analysis.type == "private_analysis"
assert getattr(analysis, "status") == status

def test_scan_file_private(httpserver, private_scan_mocks):
"""Test synchronous private file scanning."""
with new_client(httpserver) as client:
with io.StringIO("test file content") as f:
analysis = client.scan_file_private(f)
verify_analysis(analysis)

@pytest.mark.asyncio
async def test_scan_file_private_async(httpserver, private_scan_mocks):
"""Test asynchronous private file scanning."""
async with new_client(httpserver) as client:
with io.StringIO("test file content") as f:
analysis = await client.scan_file_private_async(
f,
wait_for_completion=True
)
verify_analysis(analysis, status="completed")
52 changes: 52 additions & 0 deletions vt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io
import json
import typing
import os
import aiofiles

import aiohttp

Expand Down Expand Up @@ -973,3 +975,53 @@ async def _wait_for_analysis_completion(self, analysis: Object) -> Object:

async def wait_for_analysis_completion(self, analysis: Object) -> Object:
return await self._wait_for_analysis_completion(analysis)

def scan_file_private(
self,
file: typing.Union[typing.BinaryIO, str],
wait_for_completion: bool = False
) -> Object:
"""Scan file privately.
Args:
file: File to scan (path string or file object)
wait_for_completion: Wait for completion
Returns:
Object: Analysis object with scan results
"""
return make_sync(
self.scan_file_private_async(file, wait_for_completion)
)

async def scan_file_private_async(
self,
file: typing.Union[typing.BinaryIO, str],
wait_for_completion: bool = False
) -> Object:
"""Async version of scan_file_private"""

# Handle string path
if isinstance(file, str):
async with aiofiles.open(file, 'rb') as f:
file_content = io.BytesIO(await f.read())
file_content.name = os.path.basename(file)
return await self.scan_file_private_async(
file_content,
wait_for_completion=wait_for_completion
)

# Create form data for private scan
form = aiohttp.FormData()
form.add_field('file', file)

# Get private upload URL and submit
upload_url = await self.get_data_async("/private/files/upload_url")
response = await self.post_async(upload_url, data=form)

analysis = await self._response_to_object(response)

if wait_for_completion:
analysis = await self._wait_for_analysis_completion(analysis)

return analysis

0 comments on commit 75f2e90

Please sign in to comment.