Skip to content

Commit 41509ba

Browse files
authored
fix: minor tweaks and validation handling (#1009)
* fix: fix state check on paused/failed * fix: tidy validation and logging * fix: switched to singular container on session response * fix: minor tweaks * fix: minor tweaks to logging and some validation * chore: pin min/max version for RTN
1 parent 2523993 commit 41509ba

File tree

15 files changed

+488
-395
lines changed

15 files changed

+488
-395
lines changed

Dockerfile

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ RUN apk add --no-cache \
3838
rclone \
3939
unzip \
4040
gcc \
41+
ffmpeg \
4142
musl-dev \
4243
libffi-dev \
4344
python3-dev \

makefile

+4
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ hard_reset: clean
106106
install:
107107
@poetry install --with dev
108108

109+
update:
110+
@poetry cache clear PyPI --all
111+
@poetry update
112+
109113
# Run the application
110114
run:
111115
@poetry run python src/main.py

poetry.lock

+372-255
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/program/db/db_functions.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ def run_thread_with_db_item(fn, service, program, event: Event, cancellation_eve
504504
if event.content_item:
505505
indexed_item = next(fn(event.content_item), None)
506506
if indexed_item is None:
507-
logger.debug(f"Unable to index {event.content_item.log_string}")
507+
logger.debug(f"Unable to index {event.content_item.log_string if event.content_item.log_string is not None else event.content_item.imdb_id}")
508508
return None
509509
indexed_item.store_state()
510510
session.add(indexed_item)

src/program/managers/event_manager.py

+1-90
Original file line numberDiff line numberDiff line change
@@ -206,95 +206,6 @@ def submit_job(self, service, program, event=None):
206206
websocket_manager.publish("event_update", self.get_event_updates())
207207
future.add_done_callback(lambda f:self._process_future(f, service))
208208

209-
# For debugging purposes we can monitor the execution time of the service. (comment out above and uncomment below)
210-
# def submit_job(self, service, program, event=None):
211-
# """
212-
# Submits a job to be executed by the service.
213-
214-
# Args:
215-
# service (type): The service class to execute.
216-
# program (Program): The program containing the service.
217-
# item (Event, optional): The event item to process. Defaults to None.
218-
# """
219-
# log_message = f"Submitting service {service.__name__} to be executed"
220-
# if event:
221-
# log_message += f" with {event.log_message}"
222-
# logger.debug(log_message)
223-
224-
# cancellation_event = threading.Event()
225-
# executor = self._find_or_create_executor(service)
226-
227-
# # Add start time to track execution duration
228-
# start_time = datetime.now()
229-
230-
# def _monitor_execution(future):
231-
# """Monitor execution time and log if taking too long"""
232-
# while not future.done():
233-
# execution_time = (datetime.now() - start_time).total_seconds()
234-
# if execution_time > 180: # 3 minutes
235-
# current_thread = None
236-
# for thread in threading.enumerate():
237-
# if thread.name.startswith(service.__name__) and not thread.name.endswith('_monitor'):
238-
# current_thread = thread
239-
# break
240-
241-
# if current_thread:
242-
# # Get stack frames for the worker thread
243-
# frames = sys._current_frames()
244-
# thread_frame = None
245-
# for thread_id, frame in frames.items():
246-
# if thread_id == current_thread.ident:
247-
# thread_frame = frame
248-
# break
249-
250-
# if thread_frame:
251-
# stack_trace = ''.join(traceback.format_stack(thread_frame))
252-
# else:
253-
# stack_trace = "Could not get stack trace for worker thread"
254-
# else:
255-
# stack_trace = "Could not find worker thread"
256-
257-
# logger.warning(
258-
# f"Service {service.__name__} execution taking longer than 3 minutes!\n"
259-
# f"Event: {event.log_message if event else 'No event'}\n"
260-
# f"Execution time: {execution_time:.1f} seconds\n"
261-
# f"Thread name: {current_thread.name if current_thread else 'Unknown'}\n"
262-
# f"Thread alive: {current_thread.is_alive() if current_thread else 'Unknown'}\n"
263-
# f"Stack trace:\n{stack_trace}"
264-
# )
265-
266-
# # Cancel the future and kill the thread
267-
# future.cancellation_event.set()
268-
# future.cancel()
269-
# if current_thread:
270-
# logger.warning(f"Killing thread {current_thread.name} due to timeout")
271-
# self._futures.remove(future)
272-
# if event:
273-
# self.remove_event_from_running(event)
274-
# return # Exit the monitoring thread
275-
276-
# time.sleep(60) # Check every minute
277-
278-
# future = executor.submit(db_functions.run_thread_with_db_item,
279-
# program.all_services[service].run,
280-
# service, program, event, cancellation_event)
281-
282-
# # Start monitoring thread
283-
# monitor_thread = threading.Thread(
284-
# target=_monitor_execution,
285-
# args=(future,),
286-
# name=f"{service.__name__}_monitor",
287-
# daemon=True
288-
# )
289-
# monitor_thread.start()
290-
291-
# future.cancellation_event = cancellation_event
292-
# if event:
293-
# future.event = event
294-
# self._futures.append(future)
295-
# sse_manager.publish("event_update", self.get_event_updates())
296-
# future.add_done_callback(lambda f: self._process_future(f, service))
297-
298209
def cancel_job(self, item_id: str, suppress_logs=False):
299210
"""
300211
Cancels a job associated with the given item.
@@ -343,7 +254,7 @@ def next(self) -> Event:
343254
if self._queued_events:
344255
with self.mutex:
345256
self._queued_events.sort(key=lambda event: event.run_at)
346-
if datetime.now() >= self._queued_events[0].run_at:
257+
if self._queued_events and datetime.now() >= self._queued_events[0].run_at:
347258
event = self._queued_events.pop(0)
348259
return event
349260
raise Empty

src/program/media/item.py

+8
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,10 @@ def get_season_index_by_id(self, item_id):
486486
return None
487487

488488
def _determine_state(self):
489+
if all(season.state == States.Paused for season in self.seasons):
490+
return States.Paused
491+
if all(season.state == States.Failed for season in self.seasons):
492+
return States.Failed
489493
if all(season.state == States.Completed for season in self.seasons):
490494
return States.Completed
491495
if any(season.state in [States.Ongoing, States.Unreleased] for season in self.seasons):
@@ -595,6 +599,10 @@ def __init__(self, item):
595599

596600
def _determine_state(self):
597601
if len(self.episodes) > 0:
602+
if all(episode.state == States.Paused for episode in self.episodes):
603+
return States.Paused
604+
if all(episode.state == States.Failed for episode in self.episodes):
605+
return States.Failed
598606
if all(episode.state == States.Completed for episode in self.episodes):
599607
return States.Completed
600608
if any(episode.state == States.Unreleased for episode in self.episodes):

src/program/program.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -247,18 +247,18 @@ def _update_ongoing(self) -> None:
247247
def _schedule_functions(self) -> None:
248248
"""Schedule each service based on its update interval."""
249249
scheduled_functions = {
250-
self._update_ongoing: {"interval": 60 * 60 * 24},
250+
self._update_ongoing: {"interval": 60 * 60 * 4},
251251
self._retry_library: {"interval": 60 * 60 * 24},
252252
log_cleaner: {"interval": 60 * 60},
253253
vacuum_and_analyze_index_maintenance: {"interval": 60 * 60 * 24},
254254
}
255255

256256
if settings_manager.settings.symlink.repair_symlinks:
257-
# scheduled_functions[fix_broken_symlinks] = {
258-
# "interval": 60 * 60 * settings_manager.settings.symlink.repair_interval,
259-
# "args": [settings_manager.settings.symlink.library_path, settings_manager.settings.symlink.rclone_path]
260-
# }
261-
logger.warning("Symlink repair is disabled, this will be re-enabled in the future.")
257+
scheduled_functions[fix_broken_symlinks] = {
258+
"interval": 60 * 60 * settings_manager.settings.symlink.repair_interval,
259+
"args": [settings_manager.settings.symlink.library_path, settings_manager.settings.symlink.rclone_path]
260+
}
261+
# logger.warning("Symlink repair is disabled, this will be re-enabled in the future.")
262262

263263
for func, config in scheduled_functions.items():
264264
self.scheduler.add_job(

src/program/services/downloaders/__init__.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ def validate(self):
4343
def run(self, item: MediaItem):
4444
logger.debug(f"Starting download process for {item.log_string} ({item.id})")
4545

46-
if item.is_parent_blocked():
47-
logger.debug(f"Skipping {item.log_string} ({item.id}) as it has a blocked parent, or is a blocked item")
48-
yield item
49-
5046
if item.active_stream or item.last_state in [States.Completed, States.Symlinked, States.Downloaded]:
5147
logger.debug(f"Skipping {item.log_string} ({item.id}) as it has already been downloaded by another download session")
5248
yield item
5349

50+
if item.is_parent_blocked():
51+
logger.debug(f"Skipping {item.log_string} ({item.id}) as it has a blocked parent, or is a blocked item")
52+
yield item
53+
5454
if not item.streams:
5555
logger.debug(f"No streams available for {item.log_string} ({item.id})")
5656
yield item
@@ -96,18 +96,23 @@ def validate_stream(self, stream: Stream, item: MediaItem) -> Optional[TorrentCo
9696

9797
valid_files = []
9898
for file in container.files or []:
99+
if isinstance(file, DebridFile):
100+
valid_files.append(file)
101+
continue
102+
99103
try:
100104
debrid_file = DebridFile.create(
101105
filename=file.filename,
102106
filesize_bytes=file.filesize,
103107
filetype=item.type,
104108
file_id=file.file_id
105109
)
110+
111+
if isinstance(debrid_file, DebridFile):
112+
valid_files.append(debrid_file)
106113
except InvalidDebridFileException as e:
107114
logger.debug(f"{stream.infohash}: {e}")
108115
continue
109-
if debrid_file:
110-
valid_files.append(debrid_file)
111116

112117
if valid_files:
113118
container.files = valid_files

src/program/services/downloaders/models.py

+15
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ def create(
7777

7878
return cls(filename=filename, filesize=filesize_bytes, file_id=file_id)
7979

80+
def to_dict(self) -> Dict[str, Union[int, str]]:
81+
"""Convert the DebridFile to a dictionary"""
82+
return {
83+
"filename": self.filename,
84+
"filesize": self.filesize,
85+
"file_id": self.file_id
86+
}
87+
8088

8189
class ParsedFileData(BaseModel):
8290
"""Represents a parsed file from a filename"""
@@ -100,6 +108,13 @@ def file_ids(self) -> List[int]:
100108
"""Get the file ids of the cached files"""
101109
return [file.file_id for file in self.files if file.file_id is not None]
102110

111+
def to_dict(self) -> Dict[str, Union[str, Dict]]:
112+
"""Convert the TorrentContainer to a dictionary including the infohash"""
113+
return {
114+
"infohash": self.infohash,
115+
"files": {file.file_id: file.to_dict() for file in self.files if file}
116+
}
117+
103118

104119
class TorrentInfo(BaseModel):
105120
"""Torrent information from a debrid service"""

src/program/services/downloaders/realdebrid.py

+36-22
Original file line numberDiff line numberDiff line change
@@ -132,17 +132,15 @@ def _validate_premium(self) -> bool:
132132

133133
def get_instant_availability(self, infohash: str, item_type: str) -> Optional[TorrentContainer]:
134134
"""
135-
Get instant availability for multiple infohashes.
135+
Get instant availability for a single infohash.
136136
Creates a makeshift availability check since Real-Debrid no longer supports instant availability.
137137
"""
138-
valid_container: Optional[TorrentContainer] = None
138+
container: Optional[TorrentContainer] = None
139139
torrent_id = None
140140

141141
try:
142142
torrent_id = self.add_torrent(infohash)
143143
container = self._process_torrent(torrent_id, infohash, item_type)
144-
if container:
145-
valid_container = container
146144
except InvalidDebridFileException as e:
147145
logger.debug(f"{infohash}: {e}")
148146
except Exception as e:
@@ -151,12 +149,23 @@ def get_instant_availability(self, infohash: str, item_type: str) -> Optional[To
151149
if torrent_id is not None:
152150
self.delete_torrent(torrent_id)
153151

154-
return valid_container
152+
return container
155153

156154
def _process_torrent(self, torrent_id: str, infohash: str, item_type: str) -> Optional[TorrentContainer]:
157155
"""Process a single torrent and return a TorrentContainer if valid."""
158156
torrent_info = self.get_torrent_info(torrent_id)
159-
157+
if not torrent_info:
158+
logger.debug(f"No torrent info found for {torrent_id} with infohash {infohash}")
159+
return None
160+
161+
if not torrent_info.files:
162+
logger.debug(f"No files found in torrent {torrent_id} with infohash {infohash}")
163+
return None
164+
165+
if torrent_info.status in ("magnet_error", "error", "virus", "dead"):
166+
logger.debug(f"Torrent {torrent_id} with infohash {infohash} is invalid. Torrent status on Real-Debrid: {torrent_info.status}")
167+
return None
168+
160169
if torrent_info.status == "waiting_files_selection":
161170
video_file_ids = [
162171
file_id for file_id, file_info in torrent_info.files.items()
@@ -171,25 +180,30 @@ def _process_torrent(self, torrent_id: str, infohash: str, item_type: str) -> Op
171180
torrent_info = self.get_torrent_info(torrent_id)
172181

173182
if torrent_info.status != "downloaded":
174-
logger.debug(f"Torrent {torrent_id} with infohash {infohash} is not cached")
183+
logger.debug(f"Torrent {torrent_id} with infohash {infohash} is not cached.")
175184
return None
176185

177-
if not torrent_info.files:
178-
return None
179-
180-
torrent_files = [
181-
file for file in (
182-
DebridFile.create(
183-
file_info["filename"],
184-
file_info["bytes"],
185-
item_type,
186-
file_id
186+
torrent_files = []
187+
for file_id, file_info in torrent_info.files.items():
188+
try:
189+
debrid_file = DebridFile.create(
190+
filename=file_info["filename"],
191+
filesize_bytes=file_info["bytes"],
192+
filetype=item_type,
193+
file_id=file_id
187194
)
188-
for file_id, file_info in torrent_info.files.items()
189-
) if file is not None
190-
]
191195

192-
return TorrentContainer(infohash=infohash, files=torrent_files) if torrent_files else None
196+
if isinstance(debrid_file, DebridFile):
197+
torrent_files.append(debrid_file)
198+
except InvalidDebridFileException as e:
199+
logger.debug(f"{infohash}: {e}")
200+
continue
201+
202+
if not torrent_files:
203+
logger.debug(f"No valid files found after validating files in torrent {torrent_id} with infohash {infohash}")
204+
return None
205+
206+
return TorrentContainer(infohash=infohash, files=torrent_files)
193207

194208
def add_torrent(self, infohash: str) -> str:
195209
"""Add a torrent by infohash"""
@@ -223,7 +237,7 @@ def get_torrent_info(self, torrent_id: str) -> TorrentInfo:
223237
"""Get information about a torrent"""
224238
try:
225239
data = self.api.request_handler.execute(HttpMethod.GET, f"torrents/info/{torrent_id}")
226-
files = {file["id"]: {"filename": file["path"].split("/")[-1], "bytes": file["bytes"]} for file in data["files"]}
240+
files = {file["id"]: {"filename": file["path"].split("/")[-1], "bytes": file["bytes"], "selected": file["selected"]} for file in data["files"]}
227241
return TorrentInfo(
228242
id=data["id"],
229243
name=data["filename"],

src/program/services/post_processing/__init__.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from program.media.item import MediaItem, Movie, Show
99
from program.media.state import States
1010
from program.services.post_processing.subliminal import Subliminal
11+
# from program.services.post_processing.ffmpeg import VideoValidator
1112
from program.settings.manager import settings_manager
1213
from program.utils.notifications import notify_on_complete
1314

@@ -18,17 +19,20 @@ def __init__(self):
1819
self.initialized = False
1920
self.settings = settings_manager.settings.post_processing
2021
self.services = {
21-
Subliminal: Subliminal()
22+
Subliminal: Subliminal(),
23+
# VideoValidator: VideoValidator()
2224
}
2325
self.initialized = True
2426

2527
def run(self, item: MediaItem):
26-
if Subliminal.should_submit(item):
27-
self.services[Subliminal].run(item)
28+
for service in self.services.values():
29+
if service.initialized and service.should_submit(item):
30+
service.run(item)
2831
if item.last_state == States.Completed:
2932
clear_streams(item)
3033
yield item
3134

35+
3236
def notify(item: MediaItem):
3337
show = None
3438
if item.type in ["show", "movie"]:
@@ -49,4 +53,4 @@ def _notify(_item: Show | Movie):
4953
duration = round((datetime.now() - _item.requested_at).total_seconds())
5054
logger.success(f"{_item.log_string} has been completed in {duration} seconds.")
5155
if settings_manager.settings.notifications.enabled:
52-
notify_on_complete(_item)
56+
notify_on_complete(_item)

0 commit comments

Comments
 (0)