Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial watchdog #505

Merged
merged 37 commits into from
Feb 9, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f5e877f
initial watchdog
cehbrecht Dec 16, 2019
7821562
fix tests
cehbrecht Dec 16, 2019
dba42b4
removed unused finally clause
cehbrecht Dec 18, 2019
b2402c1
simplified processes load
cehbrecht Dec 18, 2019
3a93271
reverted to NullPool
cehbrecht Dec 18, 2019
0e5a0de
removed flufl.enum
cehbrecht Dec 18, 2019
15b759e
pep8
cehbrecht Dec 18, 2019
4ad9b27
renamed watchdog to jobqueue
cehbrecht Dec 18, 2019
869f3dc
fixed jobqueue config
cehbrecht Dec 18, 2019
2c7cbea
updated docs
cehbrecht Dec 18, 2019
c22a0a6
fixed parallelprocesses
cehbrecht Dec 18, 2019
1826c3e
update service description
cehbrecht Dec 18, 2019
e2046b3
using pywps.log
cehbrecht Dec 18, 2019
d5f08d1
only log when queue has changed
cehbrecht Dec 18, 2019
e6d4a28
fix pep8
cehbrecht Dec 18, 2019
e666640
Introduces Alembic database management
jachym Dec 31, 2019
62d6eec
Merge pull request #1 from jachym/alembic
cehbrecht Jan 2, 2020
d73b63c
updated alembic
cehbrecht Jan 2, 2020
a6173fa
fix tests on travis
cehbrecht Jan 2, 2020
97e36f3
fix travis
cehbrecht Jan 2, 2020
086b124
fix travis
cehbrecht Jan 2, 2020
66a61a3
fix travis
cehbrecht Jan 2, 2020
2deb839
fix pep8
cehbrecht Jan 2, 2020
7c38fc7
using pywps -c pywps.cfg
cehbrecht Jan 2, 2020
0c1b97a
fix travis
cehbrecht Jan 2, 2020
e310fff
fix imports
cehbrecht Jan 2, 2020
166ade0
fix imports
cehbrecht Jan 2, 2020
c620c0f
fixed header
cehbrecht Jan 9, 2020
617d600
test python 3.7
cehbrecht Jan 10, 2020
935197d
fix travis for python 3.7
cehbrecht Jan 10, 2020
ffd350c
fix travis
cehbrecht Jan 10, 2020
1d0d3d8
allow overwrite of pywps config
cehbrecht Jan 10, 2020
d42a4f0
fixed typo
cehbrecht Jan 10, 2020
df041a9
fix migrate cli
cehbrecht Jan 10, 2020
8474ce7
fixed log message
cehbrecht Jan 14, 2020
2099b6e
using only default database prefix
cehbrecht Jan 14, 2020
c8ac7dd
added drop_db to migrate
cehbrecht Jan 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dist: trusty

python:
- "3.6"
# - "2.7"

git:
submodules: false
Expand Down Expand Up @@ -37,6 +38,7 @@ script:
- python -m unittest tests
- coverage run --source=pywps -m unittest tests
- flake8 pywps/
- flake8 bin/

after_success:
- coveralls
Expand Down
15 changes: 14 additions & 1 deletion docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,18 @@ configuration file <https://docs.pycsw.org/en/latest/configuration.html>`_.
directory of the GRASS GIS instalation, refered as `GISBASE
<https://grass.osgeo.org/grass73/manuals/variables.html>`_

[daemon]
--------

:pidfile:
cehbrecht marked this conversation as resolved.
Show resolved Hide resolved
location of the PID file for PyWPS daemon running in the background
`/var/run/pywps-daemon.pid`
:pause:
pausing in seconds between periodical check for new stored requests

[s3]
----

:bucket:
Name of the bucket to store files in. e.g. ``my-wps-results``

Expand Down Expand Up @@ -269,10 +279,13 @@ Sample file
[grass]
gisbase=/usr/local/grass-7.3.svn/

[daemon]
pidfile=/var/run/pywps-daemon.pid
pause=30

[s3]
bucket=my-org-wps
region=us-east-1
prefix=appname/coolapp/
public=true
encrypt=false

51 changes: 51 additions & 0 deletions docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,47 @@
Deployment to a production server
=================================

PyWPS consists from 2 main parts: PyWPS :py:class:`pywps.app.Service` and the
:py:module:`bin.pywps_daemon`


The second part is :py:module:`bin.pywps_daemon` - the Daemon, which is
running as background daemon process, responsible for calling of the stored
request in the asynchronous mode.
cehbrecht marked this conversation as resolved.
Show resolved Hide resolved

--------------
Service module
--------------
The :py:class:`pywps.app.Service` class is responsible for *synchronous*
request executions: GetCapabilites, DescribeProcess and Execute in sync. mode,
in this case, the Service class will

1. Accept request
2a. In case, the request is to be executed in *synchronous* mode, it will be
directly executed - sync requests are immediately executed.
2b. In case, the request is to be executed as *asynchronous* mode, request
will be stored in to the database.

This means: asynchronous requests are not executed by the Service class, they
will be just stored into database.

-----------------
`pywps_daemon.py`
-----------------
The :py:module:`bin.pywps_daemon` script is to be started separately - it will
start daemon process, which will periodically check for the database stored
requests and in case, some process is there, it will removed and executed.

The daemon does start only so many processes, as many are allowed in the
`maxparallel` process number.
cehbrecht marked this conversation as resolved.
Show resolved Hide resolved

The daemon does not accept requests from the client, it is just watching the
database and starting process jobs.


------------
Installation
------------
As already described in the :ref:`installation` section, no specific deployment
procedures are for PyWPS when using flask-based server. But this formula is not
intended to be used in a production environment. For production, `sudo service apache2 restartApache httpd
Expand All @@ -11,6 +52,8 @@ more advised. PyWPS is runs as a `WSGI
<https://wsgi.readthedocs.io/en/latest/>`_ application on those servers. PyWPS
relies on the `Werkzeug <http://werkzeug.pocoo.org/>`_ library for this purpose.

Then the daemon has to be started too.

Deploying an individual PyWPS instance
--------------------------------------

Expand Down Expand Up @@ -89,6 +132,7 @@ example of a PyWPS WSGI script::
The Configuration is described in next chapter (:ref:`configuration`),
as well as process creation and deployment (:ref:`process`).


Deployment on Apache2 httpd server
----------------------------------

Expand Down Expand Up @@ -232,6 +276,13 @@ And to check that everything is ok::

Todo NGIX + uWSGI

---------------
Daemon starting
---------------
The daemon has to be started from command line::

export PYWPS_CFG=/path/to/configuration/pywps.cfg
pywps_daemon.py start

.. _deployment-testing:

Expand Down
88 changes: 24 additions & 64 deletions pywps/app/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import sys
import traceback
import json
import shutil

from pywps import dblog
Expand Down Expand Up @@ -61,10 +60,12 @@ def __init__(self, handler, identifier, title, abstract='', keywords=[], profile
self.metadata = metadata
self.profile = profile
self.version = version
self._uuid = None
self.inputs = inputs
self.outputs = outputs
self.uuid = None
self.status_store = None
# self.status_store = None
self._setup_status_storage()
# self.status_location = ''
# self.status_url = ''
self.workdir = None
Expand Down Expand Up @@ -111,12 +112,12 @@ def from_json(cls, value):
module, classname = value['class'].split(':')
# instantiate subclass of Process
new_process = getattr(importlib.import_module(module), classname)()
new_process._set_uuid(value['uuid'])
new_process.uuid = value['uuid']
new_process.set_workdir(value['workdir'])
return new_process

def execute(self, wps_request, uuid):
self._set_uuid(uuid)
self.uuid = uuid
self._setup_status_storage()
self.async_ = False
response_cls = get_response("execute")
Expand All @@ -142,11 +143,16 @@ def execute(self, wps_request, uuid):

return wps_response

def _set_uuid(self, uuid):
@property
def uuid(self):
return self._uuid

@uuid.setter
def uuid(self, uuid):
"""Set uuid and status location path and url
"""

self.uuid = uuid
self._uuid = uuid
for inpt in self.inputs:
inpt.uuid = uuid

Expand Down Expand Up @@ -177,7 +183,6 @@ def _execute_process(self, async_, wps_request, wps_response):
"""

maxparallel = int(config.get_config_value('server', 'parallelprocesses'))

running, stored = dblog.get_process_counts()

# async
Expand All @@ -187,43 +192,26 @@ def _execute_process(self, async_, wps_request, wps_response):
LOGGER.debug("Running processes: {} of {} allowed parallelprocesses".format(running, maxparallel))
LOGGER.debug("Stored processes: {}".format(stored))

if running < maxparallel or maxparallel == -1:
wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0)
LOGGER.debug("Accepted request {}".format(self.uuid))
self._run_async(wps_request, wps_response)

# try to store for later usage
else:
maxprocesses = int(config.get_config_value('server', 'maxprocesses'))
if stored >= maxprocesses:
raise ServerBusy('Maximum number of processes in queue reached. Please try later.')
LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid))
dblog.store_process(self.uuid, wps_request)
wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0)
maxprocesses = int(config.get_config_value('server', 'maxprocesses'))
if stored >= maxprocesses:
raise ServerBusy('Maximum number of processes in queue reached. Please try later.')
LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid))
dblog.store_request(self.uuid, wps_request, self)
wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0)

# not async
else:
if running >= maxparallel and maxparallel != -1:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0)
wps_response = self._run_process(wps_request, wps_response)
wps_response = self.run_process(wps_request, wps_response)

return wps_response

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_async(self, wps_request, wps_response):
import pywps.processing
process = pywps.processing.Process(
process=self,
wps_request=wps_request,
wps_response=wps_response)
LOGGER.debug("Starting process for request: {}".format(self.uuid))
process.start()

# This function may not raise exception and must return a valid wps_response
# This function may not raise exception
# and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_process(self, wps_request, wps_response):
def run_process(self, wps_request, wps_response):
LOGGER.debug("Started processing request: {}".format(self.uuid))
try:
self._set_grass(wps_request)
Expand Down Expand Up @@ -273,39 +261,11 @@ def _run_process(self, wps_request, wps_response):
wps_response._update_status(WPS_STATUS.FAILED, msg, 100)

finally:
cehbrecht marked this conversation as resolved.
Show resolved Hide resolved
# The run of the next pending request if finished here, weather or not it successfull
self.launch_next_process()
# nothing
pass

return wps_response

def launch_next_process(self):
"""Look at the queue of async process, if the queue is not empty launch the next pending request.
"""
try:
LOGGER.debug("Checking for stored requests")

stored_request = dblog.pop_first_stored()
if not stored_request:
LOGGER.debug("No stored request found")
return

(uuid, request_json) = (stored_request.uuid, stored_request.request)
if not PY2:
request_json = request_json.decode('utf-8')
LOGGER.debug("Launching the stored request {}".format(str(uuid)))
new_wps_request = WPSRequest()
new_wps_request.json = json.loads(request_json)
process_identifier = new_wps_request.identifier
process = self.service.prepare_process_for_execution(process_identifier)
process._set_uuid(uuid)
process._setup_status_storage()
process.async_ = True
new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid)
new_wps_response.store_status_file = True
process._run_async(new_wps_request, new_wps_response)
except Exception as e:
LOGGER.exception("Could not run stored process. {}".format(e))

def clean(self):
"""Clean the process working dir and other temporary files
"""
Expand Down
21 changes: 15 additions & 6 deletions pywps/app/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
import tempfile
import importlib
from werkzeug.exceptions import HTTPException
from werkzeug.wrappers import Request, Response
from pywps._compat import PY2
Expand Down Expand Up @@ -41,12 +42,9 @@ class Service(object):
:param cfgfiles: A list of configuration files
"""

def __init__(self, processes=[], cfgfiles=None):
# ordered dict of processes
self.processes = OrderedDict((p.identifier, p) for p in processes)

if cfgfiles:
config.load_configuration(cfgfiles)
def __init__(self, processes=None, cfgfiles=None):
processes = processes or []
config.load_configuration(cfgfiles)

if config.get_config_value('logging', 'file') and config.get_config_value('logging', 'level'):
LOGGER.setLevel(getattr(logging, config.get_config_value('logging', 'level')))
Expand All @@ -58,6 +56,17 @@ def __init__(self, processes=[], cfgfiles=None):
if not LOGGER.handlers:
LOGGER.addHandler(logging.NullHandler())

if not processes:
# load processes from processes_module.processes_list
pname = config.get_config_value('server', 'processes')
if pname:
pmodule = '.'.join(pname.split('.')[0:-1])
plist = pname.split('.')[-1]
cehbrecht marked this conversation as resolved.
Show resolved Hide resolved
processes = getattr(importlib.import_module(pmodule), plist)

# ordered dict of processes
self.processes = OrderedDict((p.identifier, p) for p in processes)

def get_capabilities(self, wps_request, uuid):

response_cls = response.get_response("capabilities")
Expand Down
5 changes: 4 additions & 1 deletion pywps/app/WPSRequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ def json(self, value):
self.version = value['version']
self.language = value['language']
self.identifier = value['identifier']
self.identifiers = value['identifiers']
if "identifiers" in value:
self.identifiers = value['identifiers']
else:
self.identifiers = [self.identifier]
self.store_execute = value['store_execute']
self.status = value['status']
self.lineage = value['lineage']
Expand Down
6 changes: 6 additions & 0 deletions pywps/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pywps.app.Service import Service


def make_app(processes=None, cfgfiles=None):
app = Service(processes=processes, cfgfiles=cfgfiles)
return app
Loading