Skip to content

Commit 419f0f7

Browse files
committed
Split handle method in several smaller ones and add "--force" option
1 parent 0f252fe commit 419f0f7

File tree

1 file changed

+159
-116
lines changed

1 file changed

+159
-116
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# coding: utf-8
2+
from __future__ import annotations
3+
24
from collections import defaultdict
35
from datetime import timedelta
46

@@ -41,30 +43,31 @@ def add_arguments(self, parser):
4143
),
4244
)
4345

46+
parser.add_argument(
47+
'-f', '--force',
48+
action='store_true',
49+
default=False,
50+
help='Recalculate counters for every user. Default is False',
51+
)
52+
4453
def handle(self, *args, **kwargs):
45-
chunks = kwargs['chunks']
4654
days = kwargs['days']
47-
verbosity = kwargs['verbosity']
48-
55+
self._chunks = kwargs['chunks']
56+
self._force = kwargs['force']
57+
self._verbosity = kwargs['verbosity']
4958
today = timezone.now().date()
5059
delta = timedelta(days=days)
5160
date_threshold = today - delta
5261
# We want to take the first day of the month to get accurate count for
5362
# monthly counters
54-
date_threshold = date_threshold.replace(day=1)
55-
if verbosity >= 1:
63+
self._date_threshold = date_threshold.replace(day=1)
64+
if self._verbosity >= 1:
5665
self.stdout.write(
5766
f'Daily and monthly counters will be (re)calculated '
58-
f'since {date_threshold.strftime("%Y-%m-%d UTC")}'
67+
f'since {self._date_threshold.strftime("%Y-%m-%d UTC")}'
5968
)
6069

61-
# Release any locks on the users' profile from getting submissions
62-
UserProfile.objects.all().update(
63-
metadata=ReplaceValues(
64-
'metadata',
65-
updates={'submissions_suspended': False},
66-
),
67-
)
70+
self.release_old_locks()
6871

6972
# Get profiles whose users' submission counters have not been updated yet.
7073
subquery = UserProfile.objects.values_list('user_id', flat=True).filter(
@@ -75,116 +78,156 @@ def handle(self, *args, **kwargs):
7578
User.objects.only('username')
7679
.exclude(pk=settings.ANONYMOUS_USER_ID)
7780
.exclude(pk__in=subquery)
78-
.iterator(chunk_size=chunks)
81+
.iterator(chunk_size=self._chunks)
7982
):
80-
if verbosity >= 1:
83+
if self._verbosity >= 1:
8184
self.stdout.write(f'Processing user {user.username}...')
8285

83-
# Retrieve or create user's profile.
84-
(
85-
user_profile,
86-
created,
87-
) = UserProfile.objects.get_or_create(user_id=user.pk)
86+
self.suspend_submissions_for_user(user)
8887

89-
# Some old profiles don't have metadata
90-
if user_profile.metadata is None:
91-
user_profile.metadata = {}
88+
with transaction.atomic():
9289

93-
# Set the flag `submissions_suspended` to true if it is not already.
94-
if not user_profile.metadata.get('submissions_suspended'):
95-
# We are using the flag `submissions_suspended` to prevent
96-
# new submissions from coming in while the
97-
# counters are being calculated.
98-
user_profile.metadata['submissions_suspended'] = True
99-
user_profile.save(update_fields=['metadata'])
90+
self.clean_old_data(user)
10091

101-
with transaction.atomic():
92+
for xf in user.xforms.only('pk', 'id_string').iterator(chunk_size=self._chunks):
10293

103-
# First delete only records covered by desired max days.
104-
if verbosity >= 2:
105-
self.stdout.write(f'\tDeleting old data...')
106-
DailyXFormSubmissionCounter.objects.filter(
107-
xform__user_id=user.pk, date__gte=date_threshold
108-
).delete()
109-
110-
# Because we don't have a real date field on `MonthlyXFormSubmissionCounter`
111-
# but we need to cast `year` and `month` as a date field to
112-
# compare it with `date_threshold`
113-
MonthlyXFormSubmissionCounter.objects.annotate(
114-
date=Cast(
115-
Concat(
116-
F('year'), Value('-'), F('month'), Value('-'), 1
117-
),
118-
DateField(),
119-
)
120-
).filter(user_id=user.pk, date__gte=date_threshold).delete()
121-
122-
for xf in user.xforms.only('pk').iterator(chunk_size=chunks):
123-
124-
daily_counters = []
125-
monthly_counters = []
126-
total_submissions = defaultdict(int)
127-
128-
for values in (
129-
xf.instances.filter(
130-
date_created__date__gte=date_threshold
131-
)
132-
.values('date_created__date')
133-
.annotate(num_of_submissions=Count('pk'))
134-
.order_by('date_created__date')
135-
):
136-
submission_date = values['date_created__date']
137-
daily_counters.append(DailyXFormSubmissionCounter(
138-
xform_id=xf.pk,
139-
date=submission_date,
140-
counter=values['num_of_submissions'],
141-
))
142-
key = (
143-
f'{submission_date.year}-{submission_date.month}'
94+
if self._verbosity >= 2:
95+
self.stdout.write(
96+
f'\tProcessing XForm {xf.id_string} #{xf.id}'
14497
)
145-
total_submissions[key] += values['num_of_submissions']
14698

147-
if daily_counters:
148-
if verbosity >= 2:
149-
self.stdout.write(f'\tInserting daily counters data...')
150-
DailyXFormSubmissionCounter.objects.bulk_create(
151-
daily_counters, batch_size=chunks
152-
)
153-
elif verbosity >= 2:
154-
self.stdout.write(f'\tNo daily counters data...')
155-
156-
for key, total in total_submissions.items():
157-
year, month = key.split('-')
158-
monthly_counters.append(MonthlyXFormSubmissionCounter(
159-
year=year,
160-
month=month,
161-
xform_id=xf.pk,
162-
user_id=user.pk,
163-
counter=total,
164-
))
165-
166-
if monthly_counters:
167-
if verbosity >= 2:
168-
self.stdout.write(f'\tInserting monthly counters data...')
169-
MonthlyXFormSubmissionCounter.objects.bulk_create(
170-
monthly_counters, batch_size=chunks
171-
)
172-
elif verbosity >= 2:
173-
self.stdout.write(f'\tNo monthly counters data!')
174-
175-
# Update user's profile (and lock the related row)
176-
updates = {
177-
'submissions_suspended': False,
178-
'counters_updates_status': 'complete',
179-
}
180-
UserProfile.objects.filter(
181-
user_id=user.pk
182-
).update(
183-
metadata=ReplaceValues(
184-
'metadata',
185-
updates=updates,
186-
),
187-
)
188-
189-
if verbosity >= 1:
99+
daily_counters, total_submissions = self.build_counters(xf)
100+
self.add_daily_counters(daily_counters)
101+
self.add_monthly_counters(total_submissions, xf, user)
102+
103+
self.update_user_profile(user)
104+
105+
if self._verbosity >= 1:
190106
self.stdout.write(f'Done!')
107+
108+
def add_daily_counters(self, daily_counters: list):
109+
if daily_counters:
110+
if self._verbosity >= 2:
111+
self.stdout.write(f'\tInserting daily counters data...')
112+
DailyXFormSubmissionCounter.objects.bulk_create(
113+
daily_counters, batch_size=self._chunks
114+
)
115+
elif self._verbosity >= 2:
116+
self.stdout.write(f'\tNo daily counters data...')
117+
118+
def add_monthly_counters(
119+
self, total_submissions: dict, xform: 'logger.XForm', user: 'auth.User'
120+
):
121+
monthly_counters = []
122+
123+
for key, total in total_submissions.items():
124+
year, month = key.split('-')
125+
monthly_counters.append(MonthlyXFormSubmissionCounter(
126+
year=year,
127+
month=month,
128+
xform_id=xform.pk,
129+
user_id=user.pk,
130+
counter=total,
131+
))
132+
133+
if monthly_counters:
134+
if self._verbosity >= 2:
135+
self.stdout.write(f'\tInserting monthly counters data...')
136+
MonthlyXFormSubmissionCounter.objects.bulk_create(
137+
monthly_counters, batch_size=self._chunks
138+
)
139+
elif self._verbosity >= 2:
140+
self.stdout.write(f'\tNo monthly counters data!')
141+
142+
def build_counters(self, xf: 'logger.XForm') -> tuple[list, dict]:
143+
daily_counters = []
144+
total_submissions = defaultdict(int)
145+
146+
for values in (
147+
xf.instances.filter(
148+
date_created__date__gte=self._date_threshold
149+
)
150+
.values('date_created__date')
151+
.annotate(num_of_submissions=Count('pk'))
152+
.order_by('date_created__date')
153+
):
154+
submission_date = values['date_created__date']
155+
daily_counters.append(DailyXFormSubmissionCounter(
156+
xform_id=xf.pk,
157+
date=submission_date,
158+
counter=values['num_of_submissions'],
159+
))
160+
key = (
161+
f'{submission_date.year}-{submission_date.month}'
162+
)
163+
total_submissions[key] += values['num_of_submissions']
164+
165+
return daily_counters, total_submissions
166+
167+
def clean_old_data(self, user: 'auth.User'):
168+
# First delete only records covered by desired max days.
169+
if self._verbosity >= 2:
170+
self.stdout.write(f'\tDeleting old data...')
171+
DailyXFormSubmissionCounter.objects.filter(
172+
xform__user_id=user.pk, date__gte=self._date_threshold
173+
).delete()
174+
175+
# Because we don't have a real date field on `MonthlyXFormSubmissionCounter`
176+
# but we need to cast `year` and `month` as a date field to
177+
# compare it with `self._date_threshold`
178+
MonthlyXFormSubmissionCounter.objects.annotate(
179+
date=Cast(
180+
Concat(
181+
F('year'), Value('-'), F('month'), Value('-'), 1
182+
),
183+
DateField(),
184+
)
185+
).filter(user_id=user.pk, date__gte=self._date_threshold).delete()
186+
187+
def suspend_submissions_for_user(self, user: 'auth.User'):
188+
# Retrieve or create user's profile.
189+
(
190+
user_profile,
191+
created,
192+
) = UserProfile.objects.get_or_create(user_id=user.pk)
193+
194+
# Some old profiles don't have metadata
195+
if user_profile.metadata is None:
196+
user_profile.metadata = {}
197+
198+
# Set the flag `submissions_suspended` to true if it is not already.
199+
if not user_profile.metadata.get('submissions_suspended'):
200+
# We are using the flag `submissions_suspended` to prevent
201+
# new submissions from coming in while the
202+
# counters are being calculated.
203+
user_profile.metadata['submissions_suspended'] = True
204+
user_profile.save(update_fields=['metadata'])
205+
206+
def release_old_locks(self):
207+
updates = {'submissions_suspended': False}
208+
209+
if self._force:
210+
updates['counters_updates_status'] = 'not-complete'
211+
212+
# Release any locks on the users' profile from getting submissions
213+
UserProfile.objects.all().update(
214+
metadata=ReplaceValues(
215+
'metadata',
216+
updates=updates,
217+
),
218+
)
219+
220+
def update_user_profile(self, user: 'auth.User'):
221+
# Update user's profile (and lock the related row)
222+
updates = {
223+
'submissions_suspended': False,
224+
'counters_updates_status': 'complete',
225+
}
226+
UserProfile.objects.filter(
227+
user_id=user.pk
228+
).update(
229+
metadata=ReplaceValues(
230+
'metadata',
231+
updates=updates,
232+
),
233+
)

0 commit comments

Comments
 (0)