-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
3.11.0 Using redis as jobstore will cause a large number of abnormal processes and eventually cause memory overflow #996
Comments
Try again in English please. |
root 9434 8777 9 13:21 pts/0 00:00:03 /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=6, pipe_handle=28) --multiprocessing-fork |
Still not English. Unless you rewrite your issue in English, I will close it. I cannot read Chinese. |
I used gunicorn to start flask service, and used aps 3.11.0,redis as jobstore, after addjob there would be a lot of abnormal processes, and eventually caused memory overflow APS_TZ = "Asia/Shanghai" Executor 配置executors = { Job 默认配置job_defaults = { Redis jobstore 配置redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB, jobstores 字典配置jobstores = {'redis': redis_jobstore} BackgroundScheduler 配置bg_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) 使用 Flask-APScheduler 的 APScheduler 配置scheduler = APScheduler(scheduler=bg_scheduler) 定时任务的函数def task_1(): 添加任务scheduler.add_job( 添加任务scheduler.add_job( 添加任务scheduler.add_job( 添加任务scheduler.add_job( 添加任务scheduler.add_job( 添加任务scheduler.add_job( 启动调度器scheduler.start() |
By "abnormal" processes, do you mean the processes from |
This is what the exception process looks like: root 9434 8777 9 13:21 pts/0 00:00:03 /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=6, pipe_handle=28) --multiprocessing-fork |
I'll give you a screenshot of my usage |
There should only be a maximum of 5 subprocesses, as defined by the configuration of |
I'll try to repeat it with a minimal example, and I'll send it to you later |
from datetime import datetime app = Flask(name) APS_TZ = "Asia/Shanghai" Executor 配置executors = { Job 默认配置job_defaults = { Redis jobstore 配置redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB, jobstores 字典配置jobstores = {'redis': redis_jobstore} def heart_beat(): if not scheduler.get_job(id=f"{heart_beat.name}"): scheduler.start() def task1(): @app.route('/', methods=['GET']) if name == "main": 1.Write the above code to app.py and replace it with your own redis configuration
|
Is the problem only reproducible with |
I'm not sure, because by running schedule.start() alone, it ends immediately, and I just start in flask and execute this addjob in the interface |
That's because you're using |
I have verified that it has nothing to do with flask. I will send you the code
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from flask_apscheduler import APScheduler
from flask import Flask, jsonify
app = Flask(__name__)
APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15
executors = {
'default': ThreadPoolExecutor(max_workers=10),
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': APS_JDE_COALESCE,
'max_instances': APS_JDE_MAX_INSTANCES,
"misfire_grace_time": APS_JDE_MFG_TIME
}
redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB,
host=APS_JST_RDS_HOST,
port=APS_JST_RDS_PORT,
password=APS_JST_RDS_PWD)
jobstores = {'redis': redis_jobstore}
bg_scheduler = BlockingScheduler()
app.config.update({
"SCHEDULER_JOBSTORES": jobstores,
"SCHEDULER_EXECUTORS": executors,
"SCHEDULER_JOB_DEFAULTS": job_defaults,
"SCHEDULER_TIMEZONE": APS_TZ
})
scheduler = APScheduler(scheduler=bg_scheduler, app=app)
def heart_beat():
print(f"now: {datetime.now()}")
if not scheduler.get_job(id=f"{heart_beat.__name__}"):
scheduler.add_job(
id=f"{heart_beat.__name__}",
replace_existing=True,
func=heart_beat,
trigger=CronTrigger.from_crontab("*/1 * * * *", timezone=APS_TZ),
jobstore='redis',
)
else:
print(scheduler.get_job(id=f"{heart_beat.__name__}"))
if __name__ == "__main__":
scheduler.start()
from p import scheduler
from apscheduler.triggers.cron import CronTrigger
APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15
def task1():
pass
import time
scheduler.add_job(
id=str(int(time.time())), # 以定时任务id为调度id
func=task1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
start_date="2024-11-28 00:00:00",
end_date="2024-11-29 00:00:00",
jobstore='redis',
executor="processpool",
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER)
|
Have you tested with only the default memory job store? Your issue title says that using specifically the redis job store causes this problem. |
3.10.4 verion is ok |
There is no problem without using redis, version 3.10.4 does not have this problem with redis |
It seems to me that the problem is the way you are starting your Flask application. Notice that in the gunicorn settings you have set gunicorn to use 10 workers. This means that 10 copies of your application will be started, along with 10 schedulers. If the max_workers value in the ProcessPoolExecutor class is 5 by default, then when you start 10 workers from your Flask application, that means you will have 5x10 scheduler processes, which is a total of 50 scheduler processes alone. |
The example still relies on Flask. Flask-apscheduler is not supported here. The example isn't exactly minimal either. |
Ok, maybe there is this problem with flask. At present, I have gone back to version 3.10.4 and will not have this problem. Moreover, I used the stace command to see that aps link redis has been blocked and then waiting for timeout |
No, he'll add an infinite pile, more than 50 |
Why is my example not reproducing your problem then? I asked to test without Flask (and flask-apscheduler) to eliminate them as a source of problems. APScheduler 3.11 switched to using |
Things to check first
I have checked that my issue does not already have a solution in the FAQ
I have searched the existing issues and didn't find my bug already reported there
I have checked that my bug is still present in the latest release
Version
3.11.0
What happened?
我使用gunicorn 启动 flask 服务, 并且使用aps 3.11.0 ,redis作为jobstore, addjob之后会出现大量的异常进程,最终导致内存溢出
How can we reproduce the bug?
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from flask_apscheduler import APScheduler
APS_TZ = "Asia/Shanghai"
APS_JDE_COALESCE = True
APS_JDE_MAX_INSTANCES = 3
APS_JDE_MFG_TIME = 5
APS_JST_RDS_DB = 12
APS_JST_RDS_HOST = "192.168.0.17"
APS_JST_RDS_PORT = 46379
APS_JST_RDS_PWD = "Redis#2022@Cms"
APS_JDE_JITTER = 15
Executor 配置
executors = {
'default': ThreadPoolExecutor(max_workers=10),
'processpool': ProcessPoolExecutor(max_workers=5)
}
Job 默认配置
job_defaults = {
'coalesce': APS_JDE_COALESCE,
'max_instances': APS_JDE_MAX_INSTANCES,
"misfire_grace_time": APS_JDE_MFG_TIME
}
Redis jobstore 配置
redis_jobstore = RedisJobStore(db=APS_JST_RDS_DB,
host=APS_JST_RDS_HOST,
port=APS_JST_RDS_PORT,
password=APS_JST_RDS_PWD)
jobstores 字典配置
jobstores = {'redis': redis_jobstore}
BackgroundScheduler 配置
bg_scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
使用 Flask-APScheduler 的 APScheduler 配置
scheduler = APScheduler(scheduler=bg_scheduler)
定时任务的函数
def task_1():
print("Task 1 is running")
添加任务
scheduler.add_job(
id="1", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)
添加任务
scheduler.add_job(
id="14", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)
添加任务
scheduler.add_job(
id="111111111", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)
添加任务
scheduler.add_job(
id="1111", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)
添加任务
scheduler.add_job(
id="11", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)
添加任务
scheduler.add_job(
id="12", # 定时任务 ID
func=task_1,
replace_existing=True,
trigger=CronTrigger.from_crontab("* * * * *", timezone=APS_TZ),
jobstore='redis', # 使用 redis jobstore
executor="processpool", # 使用 processpool executor
misfire_grace_time=APS_JDE_MFG_TIME,
jitter=APS_JDE_JITTER
)
启动调度器
scheduler.start()
The text was updated successfully, but these errors were encountered: