24
24
from django_fsm import transition
25
25
26
26
from common .models .mixins import TimestampedMixin
27
+ from common .util import TableLock
27
28
from notifications .models import EnvelopeAcceptedNotification
28
29
from notifications .models import EnvelopeReadyForProcessingNotification
29
30
from notifications .models import EnvelopeRejectedNotification
@@ -124,6 +125,7 @@ class PackagedWorkBasketInvalidQueueOperation(Exception):
124
125
125
126
class PackagedWorkBasketManager (Manager ):
126
127
@atomic
128
+ @TableLock .acquire_lock ("publishing.PackagedWorkBasket" , lock = TableLock .EXCLUSIVE )
127
129
def create (self , workbasket : WorkBasket , ** kwargs ):
128
130
"""Create a new instance, associating with workbasket."""
129
131
if workbasket .status in WorkflowStatus .unchecked_statuses ():
@@ -437,6 +439,7 @@ def begin_processing_condition_no_instances_currently_processing(self) -> bool:
437
439
438
440
return not PackagedWorkBasket .objects .currently_processing ()
439
441
442
+ @atomic
440
443
@pop_top_after
441
444
@save_after
442
445
@transition (
@@ -466,6 +469,7 @@ def begin_processing(self):
466
469
multiple instances it's necessary for this method to perform a save()
467
470
operation upon successful transitions.
468
471
"""
472
+ PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
469
473
self .processing_started_at = datetime .now ()
470
474
self .save ()
471
475
@@ -621,7 +625,9 @@ def pop_top(self) -> "PackagedWorkBasket":
621
625
"because it is not at position 1." ,
622
626
)
623
627
624
- PackagedWorkBasket .objects .filter (position__gt = 0 ).update (
628
+ PackagedWorkBasket .objects .select_for_update (nowait = True ).filter (
629
+ position__gt = 0 ,
630
+ ).update (
625
631
position = F ("position" ) - 1 ,
626
632
)
627
633
self .refresh_from_db ()
@@ -638,6 +644,10 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
638
644
Management of the queued instance's `processing_state` is not altered by
639
645
this function and should be managed separately by the caller.
640
646
"""
647
+
648
+ PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
649
+ self .refresh_from_db ()
650
+
641
651
if self .position == 0 :
642
652
raise PackagedWorkBasketInvalidQueueOperation (
643
653
"Unable to remove instance with a position value of 0 from "
@@ -648,7 +658,9 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
648
658
self .position = 0
649
659
self .save ()
650
660
651
- PackagedWorkBasket .objects .filter (position__gt = current_position ).update (
661
+ PackagedWorkBasket .objects .select_for_update (nowait = True ).filter (
662
+ position__gt = current_position ,
663
+ ).update (
652
664
position = F ("position" ) - 1 ,
653
665
)
654
666
self .refresh_from_db ()
@@ -661,17 +673,21 @@ def promote_to_top_position(self) -> "PackagedWorkBasket":
661
673
"""Promote the instance to the top position of the package processing
662
674
queue so that it occupies position 1."""
663
675
664
- if self .position == 1 :
676
+ PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
677
+ self .refresh_from_db ()
678
+
679
+ if self .position <= 1 :
665
680
return self
666
681
667
682
position = self .position
668
683
669
- PackagedWorkBasket .objects .filter (
684
+ PackagedWorkBasket .objects .select_for_update ( nowait = True ). filter (
670
685
Q (position__gte = 1 ) & Q (position__lt = position ),
671
686
).update (position = F ("position" ) + 1 )
672
687
673
688
self .position = 1
674
689
self .save ()
690
+ self .refresh_from_db ()
675
691
676
692
return self
677
693
@@ -681,10 +697,15 @@ def promote_position(self) -> "PackagedWorkBasket":
681
697
"""Promote the instance by one position up the package processing
682
698
queue."""
683
699
684
- if self . position == 1 :
685
- return
700
+ PackagedWorkBasket . objects . select_for_update ( nowait = True ). get ( pk = self . pk )
701
+ self . refresh_from_db ()
686
702
687
- obj_to_swap = PackagedWorkBasket .objects .get (position = self .position - 1 )
703
+ if self .position <= 1 :
704
+ return self
705
+
706
+ obj_to_swap = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
707
+ position = self .position - 1 ,
708
+ )
688
709
obj_to_swap .position += 1
689
710
self .position -= 1
690
711
PackagedWorkBasket .objects .bulk_update (
@@ -701,10 +722,15 @@ def demote_position(self) -> "PackagedWorkBasket":
701
722
"""Demote the instance by one position down the package processing
702
723
queue."""
703
724
704
- if self . position == PackagedWorkBasket .objects .max_position ():
705
- return
725
+ PackagedWorkBasket .objects .select_for_update ( nowait = True ). get ( pk = self . pk )
726
+ self . refresh_from_db ()
706
727
707
- obj_to_swap = PackagedWorkBasket .objects .get (position = self .position + 1 )
728
+ if self .position in {0 , PackagedWorkBasket .objects .max_position ()}:
729
+ return self
730
+
731
+ obj_to_swap = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
732
+ position = self .position + 1 ,
733
+ )
708
734
obj_to_swap .position -= 1
709
735
self .position += 1
710
736
PackagedWorkBasket .objects .bulk_update (
0 commit comments