31
31
from notifications .models import EnvelopeRejectedNotification
32
32
from notifications .models import NotificationLog
33
33
from publishing import models as publishing_models
34
+ from publishing .models .decorators import refresh_after
34
35
from publishing .models .decorators import save_after
35
36
from publishing .models .decorators import skip_notifications_if_disabled
36
37
from publishing .models .state import ProcessingState
@@ -437,6 +438,7 @@ def begin_processing_condition_at_position_1(self) -> bool:
437
438
"""Django FSM condition: Instance must be at position 1 in order to
438
439
complete the begin_processing transition to CURRENTLY_PROCESSING."""
439
440
441
+ self .refresh_from_db (fields = ["position" ])
440
442
return self .position == 1
441
443
442
444
def begin_processing_condition_no_instances_currently_processing (self ) -> bool :
@@ -476,7 +478,9 @@ def begin_processing(self):
476
478
multiple instances it's necessary for this method to perform a save()
477
479
operation upon successful transitions.
478
480
"""
479
- PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
481
+ PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
482
+ pk = self .pk ,
483
+ )
480
484
self .processing_started_at = make_aware (datetime .now ())
481
485
self .save ()
482
486
@@ -618,6 +622,7 @@ def cds_notified_notification_log(self) -> NotificationLog:
618
622
619
623
@atomic
620
624
@create_envelope_on_new_top
625
+ @refresh_after
621
626
def pop_top (self ) -> "PackagedWorkBasket" :
622
627
"""
623
628
Pop the top-most instance, shuffling all remaining queued instances
@@ -626,23 +631,34 @@ def pop_top(self) -> "PackagedWorkBasket":
626
631
Management of the popped instance's `processing_state` is not altered by
627
632
this function and should be managed separately by the caller.
628
633
"""
629
- if self .position != 1 :
634
+
635
+ instance = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
636
+ pk = self .pk ,
637
+ )
638
+
639
+ if instance .position != 1 :
630
640
raise PackagedWorkBasketInvalidQueueOperation (
631
- "Unable to pop instance at position {self .position} in queue "
641
+ "Unable to pop instance at position {instance .position} in queue "
632
642
"because it is not at position 1." ,
633
643
)
634
644
635
- PackagedWorkBasket .objects .select_for_update (nowait = True ).filter (
636
- position__gt = 0 ,
637
- ).update (
645
+ instance .position = 0
646
+ instance .save ()
647
+
648
+ to_update = list (
649
+ PackagedWorkBasket .objects .select_for_update (nowait = True )
650
+ .filter (position__gt = 1 )
651
+ .values_list ("pk" , flat = True ),
652
+ )
653
+ PackagedWorkBasket .objects .filter (pk__in = to_update ).update (
638
654
position = F ("position" ) - 1 ,
639
655
)
640
- self .refresh_from_db ()
641
656
642
- return self
657
+ return instance
643
658
644
659
@atomic
645
660
@create_envelope_on_new_top
661
+ @refresh_after
646
662
def remove_from_queue (self ) -> "PackagedWorkBasket" :
647
663
"""
648
664
Remove instance from the queue, shuffling all successive queued
@@ -652,98 +668,111 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
652
668
this function and should be managed separately by the caller.
653
669
"""
654
670
655
- PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
656
- self .refresh_from_db ()
671
+ instance = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
672
+ pk = self .pk ,
673
+ )
657
674
658
- if self .position == 0 :
675
+ if instance .position == 0 :
659
676
raise PackagedWorkBasketInvalidQueueOperation (
660
677
"Unable to remove instance with a position value of 0 from "
661
678
"queue because 0 indicates that it is not a queue member." ,
662
679
)
663
680
664
- current_position = self .position
665
- self .position = 0
666
- self .save ()
681
+ current_position = instance .position
682
+ instance .position = 0
683
+ instance .save ()
667
684
668
- PackagedWorkBasket .objects .select_for_update (nowait = True ).filter (
669
- position__gt = current_position ,
670
- ).update (
685
+ to_update = list (
686
+ PackagedWorkBasket .objects .select_for_update (nowait = True )
687
+ .filter (position__gt = current_position )
688
+ .values_list ("pk" , flat = True ),
689
+ )
690
+ PackagedWorkBasket .objects .filter (pk__in = to_update ).update (
671
691
position = F ("position" ) - 1 ,
672
692
)
673
- self .refresh_from_db ()
674
693
675
- return self
694
+ return instance
676
695
677
696
@atomic
678
697
@create_envelope_on_new_top
698
+ @refresh_after
679
699
def promote_to_top_position (self ) -> "PackagedWorkBasket" :
680
700
"""Promote the instance to the top position of the package processing
681
701
queue so that it occupies position 1."""
682
702
683
- PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
684
- self .refresh_from_db ()
703
+ instance = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
704
+ pk = self .pk ,
705
+ )
685
706
686
- if self .position <= 1 :
687
- return self
707
+ if instance .position <= 1 :
708
+ return instance
688
709
689
- position = self .position
710
+ current_position = instance .position
690
711
691
- PackagedWorkBasket .objects .select_for_update (nowait = True ).filter (
692
- Q (position__gte = 1 ) & Q (position__lt = position ),
693
- ).update (position = F ("position" ) + 1 )
712
+ to_update = list (
713
+ PackagedWorkBasket .objects .select_for_update (nowait = True )
714
+ .filter (Q (position__gte = 1 ) & Q (position__lt = current_position ))
715
+ .values_list ("pk" , flat = True ),
716
+ )
717
+ PackagedWorkBasket .objects .filter (pk__in = to_update ).update (
718
+ position = F ("position" ) + 1 ,
719
+ )
694
720
695
- self .position = 1
696
- self .save ()
697
- self .refresh_from_db ()
721
+ instance .position = 1
722
+ instance .save ()
698
723
699
- return self
724
+ return instance
700
725
701
726
@atomic
702
727
@create_envelope_on_new_top
728
+ @refresh_after
703
729
def promote_position (self ) -> "PackagedWorkBasket" :
704
730
"""Promote the instance by one position up the package processing
705
731
queue."""
706
732
707
- PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
708
- self .refresh_from_db ()
733
+ instance = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
734
+ pk = self .pk ,
735
+ )
709
736
710
- if self .position <= 1 :
711
- return self
737
+ if instance .position <= 1 :
738
+ return instance
712
739
713
740
obj_to_swap = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
714
- position = self .position - 1 ,
741
+ position = instance .position - 1 ,
715
742
)
716
743
obj_to_swap .position += 1
717
- self .position -= 1
744
+ instance .position -= 1
745
+
718
746
PackagedWorkBasket .objects .bulk_update (
719
- [self , obj_to_swap ],
747
+ [instance , obj_to_swap ],
720
748
["position" ],
721
749
)
722
- self .refresh_from_db ()
723
750
724
- return self
751
+ return instance
725
752
726
753
@atomic
727
754
@create_envelope_on_new_top
755
+ @refresh_after
728
756
def demote_position (self ) -> "PackagedWorkBasket" :
729
757
"""Demote the instance by one position down the package processing
730
758
queue."""
731
759
732
- PackagedWorkBasket .objects .select_for_update (nowait = True ).get (pk = self .pk )
733
- self .refresh_from_db ()
760
+ instance = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
761
+ pk = self .pk ,
762
+ )
734
763
735
- if self .position in {0 , PackagedWorkBasket .objects .max_position ()}:
736
- return self
764
+ if instance .position in {0 , PackagedWorkBasket .objects .max_position ()}:
765
+ return instance
737
766
738
767
obj_to_swap = PackagedWorkBasket .objects .select_for_update (nowait = True ).get (
739
- position = self .position + 1 ,
768
+ position = instance .position + 1 ,
740
769
)
741
770
obj_to_swap .position -= 1
742
- self .position += 1
771
+ instance .position += 1
772
+
743
773
PackagedWorkBasket .objects .bulk_update (
744
- [self , obj_to_swap ],
774
+ [instance , obj_to_swap ],
745
775
["position" ],
746
776
)
747
- self .refresh_from_db ()
748
777
749
- return self
778
+ return instance
0 commit comments