-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathworkflow.py
169 lines (137 loc) · 5.1 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from django.db import models
from django.db.transaction import atomic
from django.urls import reverse
from tasks.models.queue import Queue
from tasks.models.queue import QueueItem
from tasks.models.task import Task
from tasks.models.task import TaskBase
# ----------------------
# - Workflows and tasks.
# ----------------------
class TaskWorkflow(Queue):
"""Workflow of ordered Tasks."""
summary_task = models.OneToOneField(
Task,
on_delete=models.PROTECT,
)
"""Provides task-like filtering and display capabilities for this
workflow."""
creator_template = models.ForeignKey(
"tasks.TaskWorkflowTemplate",
null=True,
on_delete=models.SET_NULL,
)
"""The template from which this workflow was created, if any."""
def get_tasks(self) -> models.QuerySet:
"""Get a QuerySet of the Tasks associated through their TaskItem
instances to this TaskWorkflow, ordered by the position of the
TaskItem."""
return Task.objects.filter(taskitem__queue=self).order_by("taskitem__position")
def __str__(self):
return self.title
class TaskItem(QueueItem):
"""Task item queue management for Task instances (these should always be
subtasks)."""
queue = models.ForeignKey(
TaskWorkflow,
related_name="queue_items",
on_delete=models.CASCADE,
)
task = models.OneToOneField(
"tasks.Task",
on_delete=models.CASCADE,
)
"""The Task instance managed by this TaskItem."""
# ----------------------------------------
# - Template workflows and template tasks.
# ----------------------------------------
class TaskWorkflowTemplate(Queue):
"""Template used to create TaskWorkflow instance."""
title = models.CharField(
max_length=255,
)
"""
A title name for the instance.
This isn't the same as the title assigned to a TaskWorkflow instance
generated from a template.
"""
description = models.TextField(
blank=True,
help_text="Description of what this workflow template is used for. ",
)
"""
Description of what the instance is used for.
This isn't the same as the description that may be applied to a TaskWorkflow
instance generated from a template.
"""
def get_task_templates(self) -> models.QuerySet:
"""Get a QuerySet of the TaskTemplates associated through their
TaskItemTemplate instances to this TaskWorkflowTemplate, ordered by the
position of the TaskItemTemplate."""
return TaskTemplate.objects.filter(taskitemtemplate__queue=self).order_by(
"taskitemtemplate__position",
)
@atomic
def create_task_workflow(self, title: str, description: str) -> "TaskWorkflow":
"""Create a workflow and it subtasks, using values from this template
workflow and its task templates."""
summary_task = Task.objects.create(title=title, description=description)
task_workflow = TaskWorkflow.objects.create(
summary_task=summary_task,
creator_template=self,
)
task_item_templates = TaskItemTemplate.objects.select_related(
"task_template",
).filter(queue=self)
for task_item_template in task_item_templates:
task_template = task_item_template.task_template
task = Task.objects.create(
title=task_template.title,
description=task_template.description,
category=task_template.category,
)
TaskItem.objects.create(
position=task_item_template.position,
queue=task_workflow,
task=task,
)
return task_workflow
def get_url(self, action: str = "detail"):
if action == "detail":
return reverse(
"workflow:task-workflow-template-ui-detail",
kwargs={"pk": self.pk},
)
elif action == "edit":
return reverse(
"workflow:task-workflow-template-ui-update",
kwargs={"pk": self.pk},
)
elif action == "delete":
return reverse(
"workflow:task-workflow-template-ui-delete",
kwargs={"pk": self.pk},
)
return "#NOT-IMPLEMENTED"
class TaskItemTemplate(QueueItem):
"""Queue item management for TaskTemplate instances."""
queue = models.ForeignKey(
TaskWorkflowTemplate,
related_name="queue_items",
on_delete=models.CASCADE,
)
task_template = models.OneToOneField(
"tasks.TaskTemplate",
on_delete=models.CASCADE,
)
class TaskTemplate(TaskBase):
"""Template used to create Task instances from within a template
workflow."""
def get_url(self, action: str = "detail"):
if action == "detail":
return reverse("workflow:task-template-ui-detail", kwargs={"pk": self.pk})
elif action == "edit":
return reverse("workflow:task-template-ui-update", kwargs={"pk": self.pk})
return "#NOT-IMPLEMENTED"
def __str__(self):
return self.title