Skip to content

Commit

Permalink
Merge pull request #278 from inbo/dec-r4
Browse files Browse the repository at this point in the history
dec r4-export
  • Loading branch information
mainlyIt authored Dec 18, 2024
2 parents bfd97ae + 07ae962 commit 6a7f93a
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 90 deletions.
28 changes: 27 additions & 1 deletion nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,28 @@ http {
include mime.types;
default_type application/octet-stream;

# Global timeout settings
proxy_connect_timeout 300;
proxy_send_timeout 300;
proxy_read_timeout 300;
send_timeout 300;

sendfile on;
keepalive_timeout 65;

# Buffering settings for large responses
proxy_buffering on;
proxy_buffer_size 16k;
proxy_buffers 8 16k;
proxy_busy_buffers_size 32k;

server {
listen 80;
server_name uat.vespadb.be;

# Increase client body size limit if needed
client_max_body_size 20M;

location /static/ {
alias /workspaces/vespadb/collected_static/;
}
Expand All @@ -29,13 +44,20 @@ http {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;

proxy_connect_timeout 300s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
}
}

server {
listen 80;
server_name data.vespawatch.be;

# Increase client body size limit if needed
client_max_body_size 20M;

location /static/ {
alias /workspaces/vespadb/collected_static/;
}
Expand All @@ -50,6 +72,10 @@ http {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;

proxy_connect_timeout 300s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
}
}
}
}
260 changes: 171 additions & 89 deletions vespadb/observations/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@
import os
import logging
from tenacity import retry, stop_after_attempt, wait_exponential

from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_log,
after_log,
)
from typing import Generator, Optional
from django.db import OperationalError, connection, transaction
from django.core.exceptions import ValidationError
import psycopg2
from django.http import FileResponse
import os
import tempfile
Expand Down Expand Up @@ -75,6 +86,13 @@
"notes", "eradication_result", "wn_id", "wn_validation_status", "nest_status"
]
BATCH_SIZE = 1000
class ExportError(Exception):
"""Custom exception for export-related errors."""
pass

class QueryTimeoutError(Exception):
"""Custom exception for query timeout errors."""
pass

class ObservationsViewSet(ModelViewSet): # noqa: PLR0904
"""ViewSet for the Observation model."""
Expand Down Expand Up @@ -719,17 +737,131 @@ def _prepare_row_data(
logger.error(f"Error preparing row data for observation {observation.id}: {str(e)}")
return [""] * len(CSV_HEADERS) # Return empty row in case of error

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((OperationalError, psycopg2.OperationalError)),
before=before_log(logger, logging.INFO),
after=before_log(logger, logging.INFO)
)
def get_queryset_count(self, queryset: QuerySet) -> int:
"""Get queryset count with retry logic."""
try:
with transaction.atomic(), connection.cursor() as cursor:
cursor.execute('SET statement_timeout TO 30000') # 30 seconds timeout
return int(queryset.count())
except (OperationalError, psycopg2.OperationalError) as e:
logger.error(f"Error getting queryset count: {str(e)}")
raise QueryTimeoutError("Query timed out while getting count") from e

def get_chunk_with_retries(
self,
queryset: QuerySet,
start: int,
batch_size: int,
max_retries: int = 3
) -> Optional[List[Observation]]:
"""Get a chunk of data with retries and error handling."""
for attempt in range(max_retries):
try:
with transaction.atomic(), connection.cursor() as cursor:
cursor.execute('SET statement_timeout TO 30000')
chunk = list(
queryset.select_related(
'province',
'municipality',
'reserved_by'
)[start:start + batch_size]
)
return chunk
except (OperationalError, psycopg2.OperationalError) as e:
if attempt == max_retries - 1:
logger.error(f"Failed to get chunk after {max_retries} attempts: {str(e)}")
return None
wait_time = (2 ** attempt) * 1 # Exponential backoff
logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
time.sleep(wait_time)
return None

def create_csv_generator(
self,
queryset: QuerySet,
is_admin: bool,
user_municipality_ids: Set[str],
batch_size: int = BATCH_SIZE
) -> Generator[str, None, None]:
"""Create a generator for CSV streaming with improved error handling."""
buffer = io.StringIO()
writer = csv.writer(buffer)

# Write headers
writer.writerow(CSV_HEADERS)
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)

total_processed = 0
successful_writes = 0
error_count = 0

try:
total_count = self.get_queryset_count(queryset)

# Process in chunks
start = 0
while True:
chunk = self.get_chunk_with_retries(queryset, start, batch_size)
if not chunk:
break

for observation in chunk:
try:
row_data = self._prepare_row_data(
observation,
is_admin,
user_municipality_ids
)
writer.writerow(row_data)
successful_writes += 1
except Exception as e:
error_count += 1
logger.error(f"Error processing observation {observation.id}: {str(e)}")
if error_count > total_count * 0.1: # If more than 10% errors
raise ExportError("Too many errors during export")
continue

data = buffer.getvalue()
yield data
buffer.seek(0)
buffer.truncate(0)

total_processed += len(chunk)
progress = (total_processed / total_count) * 100 if total_count else 0
logger.info(
f"Export progress: {progress:.1f}% ({total_processed}/{total_count}). "
f"Successful: {successful_writes}, Errors: {error_count}"
)

start += batch_size

except Exception as e:
logger.exception("Error in CSV generator")
raise ExportError(f"Export failed: {str(e)}") from e
finally:
buffer.close()

@method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True))
@action(detail=False, methods=["get"], permission_classes=[AllowAny])
def export(self, request: HttpRequest) -> Union[FileResponse, JsonResponse]:
def export(self, request: HttpRequest) -> StreamingHttpResponse:
"""
Export observations as CSV using batch processing with improved error handling.
Export observations as CSV using streaming response with improved error handling
and performance optimizations.
"""
temp_file = None
export_format = request.query_params.get("export_format", "csv").lower()

try:
# Validate export format
if request.query_params.get("export_format", "csv").lower() != "csv":
# Input validation
if export_format != "csv":
return JsonResponse({"error": "Only CSV export is supported"}, status=400)

# Get user permissions
Expand All @@ -740,95 +872,45 @@ def export(self, request: HttpRequest) -> Union[FileResponse, JsonResponse]:
user_municipality_ids = set()
is_admin = False

# Create temporary file
temp_file = tempfile.NamedTemporaryFile(mode='w+', newline='', delete=False, suffix='.csv')
writer = csv.writer(temp_file)
writer.writerow(CSV_HEADERS)

# Get filtered queryset with timeout protection
total_count = None
try:
with transaction.atomic(), connection.cursor() as cursor:
cursor.execute('SET statement_timeout TO 30000') # 30 seconds timeout
queryset = self.filter_queryset(self.get_queryset())
total_count = queryset.count()
except Exception as e:
logger.error(f"Error getting total count: {str(e)}")
# Continue with None total_count

# Process in batches with progress tracking
total_processed = 0
successful_records = 0
offset = 0
batch_size = 1000

while True:
try:
# Get batch with timeout protection
with transaction.atomic(), connection.cursor() as cursor:
cursor.execute('SET statement_timeout TO 30000')
batch = list(queryset[offset:offset + batch_size])
if not batch: # No more records
break

# Process batch with retry logic
successful_writes = self.write_batch_to_file(writer, batch, is_admin, user_municipality_ids)
successful_records += successful_writes

batch_count = len(batch)
total_processed += batch_count
offset += batch_size

# Log progress if we know the total
if total_count:
progress = (total_processed / total_count) * 100
logger.info(f"Export progress: {progress:.1f}% ({total_processed}/{total_count})")
else:
logger.info(f"Processed {total_processed} records")

except Exception as e:
logger.error(f"Error processing batch at offset {offset}: {str(e)}")
offset += batch_size # Skip problematic batch
continue

# Ensure all data is written
temp_file.flush()
# Get filtered queryset
queryset = self.filter_queryset(self.get_queryset())

# Create response
try:
response = FileResponse(
open(temp_file.name, 'rb'),
content_type='text/csv',
as_attachment=True,
filename=f"observations_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)

# Log export statistics
if total_count:
logger.info(f"Export completed: {successful_records} successful records out of {total_count} total")
else:
logger.info(f"Export completed: {successful_records} successful records")

# Clean up temp file after sending
response.close = lambda: os.unlink(temp_file.name)
return response

except Exception as e:
logger.exception("Error creating response")
if temp_file and os.path.exists(temp_file.name):
os.unlink(temp_file.name)
return JsonResponse({"error": "Error creating export file"}, status=500)
# Create streaming response
response = StreamingHttpResponse(
streaming_content=self.create_csv_generator(
queryset=queryset,
is_admin=is_admin,
user_municipality_ids=user_municipality_ids
),
content_type='text/csv'
)

# Set headers
filename = f"observations_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
response['Content-Disposition'] = f'attachment; filename="{filename}"'
response['X-Accel-Buffering'] = 'no' # Disable nginx buffering

return response

except QueryTimeoutError:
logger.exception("Query timeout during export")
return JsonResponse(
{"error": "Export timed out. Please try with a smaller date range or fewer filters."},
status=503
)
except ExportError as e:
logger.exception("Export error")
return JsonResponse(
{"error": f"Export failed: {str(e)}. Please try again or contact support."},
status=500
)
except Exception as e:
logger.exception("Export failed")
# Clean up temp file in case of error
if temp_file and os.path.exists(temp_file.name):
os.unlink(temp_file.name)
logger.exception("Unexpected error during export")
return JsonResponse(
{"error": "Export failed. Please try again or contact support."},
{"error": "An unexpected error occurred. Please try again or contact support."},
status=500
)

def get_status(self, observation: Observation) -> str:
"""Determine observation status based on eradication data."""
logger.debug("Getting status for observation %s", observation.eradication_result)
Expand Down

0 comments on commit 6a7f93a

Please sign in to comment.