From b90568f2165afc508e24b799e710d0fefe686ede Mon Sep 17 00:00:00 2001 From: Reto Kaiser Date: Wed, 14 May 2014 13:27:15 +0200 Subject: [PATCH 1/3] Add interruptable ProcessWorker --- .../wms/process/InterruptibleProcess.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 src/ch/cargomedia/wms/process/InterruptibleProcess.java diff --git a/src/ch/cargomedia/wms/process/InterruptibleProcess.java b/src/ch/cargomedia/wms/process/InterruptibleProcess.java new file mode 100644 index 0000000..1646c68 --- /dev/null +++ b/src/ch/cargomedia/wms/process/InterruptibleProcess.java @@ -0,0 +1,86 @@ +package ch.cargomedia.wms.process; + +import com.wowza.wms.logging.WMSLoggerFactory; +import org.apache.commons.lang.StringUtils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +public class InterruptibleProcess extends Thread { + + private String[] _command; + private BufferedReader _processReader = null; + private Boolean _interrupted = false; + + private String _output = null; + private Integer _exitCode = null; + private Exception _exception = null; + + public InterruptibleProcess(String[] command) { + _command = command; + } + + public void run() { + try { + ProcessBuilder builder = new ProcessBuilder(_command); + builder.redirectErrorStream(true); + Process _process = builder.start(); + + _processReader = new BufferedReader(new InputStreamReader(_process.getInputStream())); + String line; + _output = ""; + while ((line = _processReader.readLine()) != null) { + _output += line + "\n"; + } + + if (_interrupted) { + throw new InterruptedException(); + } + + _exitCode = _process.waitFor(); + if (0 != _exitCode) { + throw new Exception(String.format("Command exited with code `%s`. \nCommand: %s \nOutput: \n%s", + _process.exitValue(), StringUtils.join(_command, " "), _output)); + } + } catch (Exception e) { + _exception = e; + } + } + + @Override + public void interrupt() { + _interrupted = true; + this._closeProcessReader(); + super.interrupt(); + } + + public String getOutput() { + return _output; + } + + public Integer getExitCode() { + return _exitCode; + } + + public Exception getException() { + return _exception; + } + + public void throwExceptionIfAny() throws Exception { + if (null != _exception) { + throw _exception; + } + } + + private void _closeProcessReader() { + if (null != _processReader) { + try { + _processReader.close(); + _processReader = null; + } catch (IOException e) { + WMSLoggerFactory.getLogger(null).error("Cannot close process reader: " + e.getMessage()); + } + } + } +} From 52a14592224eeb877b51a6c62af94c8048315472 Mon Sep 17 00:00:00 2001 From: Reto Kaiser Date: Wed, 14 May 2014 14:11:23 +0200 Subject: [PATCH 2/3] Add interruptable ProcessSequence --- .../wms/process/ProcessSequence.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 src/ch/cargomedia/wms/process/ProcessSequence.java diff --git a/src/ch/cargomedia/wms/process/ProcessSequence.java b/src/ch/cargomedia/wms/process/ProcessSequence.java new file mode 100644 index 0000000..0b9634e --- /dev/null +++ b/src/ch/cargomedia/wms/process/ProcessSequence.java @@ -0,0 +1,35 @@ +package ch.cargomedia.wms.process; + +import java.util.ArrayList; + +public class ProcessSequence { + + private ArrayList _commandList = new ArrayList(); + private InterruptibleProcess _processWorker = null; + private Boolean _interrupted = false; + + public void addCommand(String[] command) { + _commandList.add(command); + } + + public void runAll() throws Exception { + for (String[] command : _commandList) { + if (_interrupted) { + throw new InterruptedException(); + } + _processWorker = new InterruptibleProcess(command); + _processWorker.run(); + _processWorker.join(0); + _processWorker.throwExceptionIfAny(); + _processWorker = null; + } + _commandList.clear(); + } + + public void interrupt() { + _interrupted = true; + if (null != _processWorker) { + _processWorker.interrupt(); + } + } +} From 21e82ba84c71b15b4ceb954fd9da90a44f4d781a Mon Sep 17 00:00:00 2001 From: Reto Kaiser Date: Wed, 14 May 2014 14:14:04 +0200 Subject: [PATCH 3/3] Use processSequence for thumbnailer and archiver --- src/ch/cargomedia/wms/Utils.java | 24 ------------------- .../module/eventhandler/StreamListener.java | 2 +- .../cargomedia/wms/transcoder/Archiver.java | 10 ++++---- .../wms/transcoder/Thumbnailer.java | 24 ++++++++++++------- 4 files changed, 22 insertions(+), 38 deletions(-) diff --git a/src/ch/cargomedia/wms/Utils.java b/src/ch/cargomedia/wms/Utils.java index a573ff3..90ff205 100644 --- a/src/ch/cargomedia/wms/Utils.java +++ b/src/ch/cargomedia/wms/Utils.java @@ -1,10 +1,6 @@ package ch.cargomedia.wms; -import org.apache.commons.lang.StringUtils; - -import java.io.BufferedReader; import java.io.File; -import java.io.InputStreamReader; import java.util.UUID; public class Utils { @@ -26,24 +22,4 @@ public static File getTempFile() { return getTempFile(null); } - public static String exec(String[] command) throws Exception { - ProcessBuilder builder = new ProcessBuilder(command); - builder.redirectErrorStream(true); - Process process = builder.start(); - - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line; - String output = ""; - while ((line = reader.readLine()) != null) { - output += line + "\n"; - } - - if (process.waitFor() != 0) { - throw new Exception(String.format("Command exited with code `%s`. \nCommand: %s \nOutput: \n%s", - process.exitValue(), StringUtils.join(command, " "), output)); - } - - return output; - } - } diff --git a/src/ch/cargomedia/wms/module/eventhandler/StreamListener.java b/src/ch/cargomedia/wms/module/eventhandler/StreamListener.java index 84e37ba..2dd10f8 100644 --- a/src/ch/cargomedia/wms/module/eventhandler/StreamListener.java +++ b/src/ch/cargomedia/wms/module/eventhandler/StreamListener.java @@ -140,7 +140,7 @@ private void _onUnpublish(IMediaStream stream) throws Exception { VideostreamPublisher videostreamPublisher = videostreamPublishList.get(stream.getName()); if (_thumbnailer != null) { - _thumbnailer.cancel(); + _thumbnailer.interrupt(); } if (videostreamPublisher != null) { diff --git a/src/ch/cargomedia/wms/transcoder/Archiver.java b/src/ch/cargomedia/wms/transcoder/Archiver.java index e6e6939..f13c6cd 100644 --- a/src/ch/cargomedia/wms/transcoder/Archiver.java +++ b/src/ch/cargomedia/wms/transcoder/Archiver.java @@ -2,6 +2,7 @@ import ch.cargomedia.wms.Application; import ch.cargomedia.wms.Utils; +import ch.cargomedia.wms.process.ProcessSequence; import ch.cargomedia.wms.stream.VideostreamPublisher; import com.wowza.wms.logging.WMSLoggerFactory; @@ -9,6 +10,8 @@ public class Archiver extends Thread { + private ProcessSequence _processSequence = new ProcessSequence(); + private VideostreamPublisher _stream; private File _input; private String _pathBinCm; @@ -23,8 +26,7 @@ public Archiver(VideostreamPublisher stream) { public void run() { File output = Utils.getTempFile("mp4"); try { - - Utils.exec(new String[]{ + _processSequence.addCommand(new String[]{ "ffmpeg", "-threads", "1", "-i", _input.getAbsolutePath(), @@ -35,14 +37,14 @@ public void run() { "-loglevel", "warning", output.getAbsolutePath(), }); - - Utils.exec(new String[]{ + _processSequence.addCommand(new String[]{ _pathBinCm, "stream", "import-video-archive", String.valueOf(_stream.getStreamChannelId()), output.getAbsolutePath(), }); + _processSequence.runAll(); } catch (Exception e) { WMSLoggerFactory.getLogger(null).error("Cannot create archive: " + e.getMessage()); diff --git a/src/ch/cargomedia/wms/transcoder/Thumbnailer.java b/src/ch/cargomedia/wms/transcoder/Thumbnailer.java index 31b8778..0352075 100644 --- a/src/ch/cargomedia/wms/transcoder/Thumbnailer.java +++ b/src/ch/cargomedia/wms/transcoder/Thumbnailer.java @@ -2,6 +2,7 @@ import ch.cargomedia.wms.Application; import ch.cargomedia.wms.Utils; +import ch.cargomedia.wms.process.ProcessSequence; import ch.cargomedia.wms.stream.VideostreamPublisher; import com.wowza.wms.logging.WMSLoggerFactory; import com.wowza.wms.stream.IMediaStream; @@ -11,17 +12,18 @@ public class Thumbnailer extends TimerTask { + private ProcessSequence _processSequence = new ProcessSequence(); + private VideostreamPublisher _stream; private String _input; private String _pathBinCm; private int _width; private int _height; - private IMediaStream _mediaStream; + public Thumbnailer(VideostreamPublisher stream, IMediaStream mediaStream) { Application application = Application.getInstance(); _stream = stream; - _mediaStream = mediaStream; _input = "rtmp://127.0.0.1/" + application.getName() + "/" + stream.getStreamName(); _pathBinCm = application.getConfig().getCmBinPath(); _width = application.getConfig().getThumbnailWidth(); @@ -31,8 +33,7 @@ public Thumbnailer(VideostreamPublisher stream, IMediaStream mediaStream) { public void run() { File output = Utils.getTempFile("png"); try { - - Utils.exec(new String[]{ + _processSequence.addCommand(new String[]{ "ffmpeg", "-threads", "1", "-i", _input, @@ -45,20 +46,25 @@ public void run() { "-loglevel", "warning", output.getAbsolutePath(), }); - - Utils.exec(new String[]{ + _processSequence.addCommand(new String[]{ _pathBinCm, "stream", "import-video-thumbnail", String.valueOf(_stream.getStreamChannelId()), output.getAbsolutePath(), }); + _processSequence.runAll(); + } catch (InterruptedException e) { + WMSLoggerFactory.getLogger(null).info("Thumbnail creation interrupted."); } catch (Exception e) { - if (_mediaStream.isOpen()) { - WMSLoggerFactory.getLogger(null).error("Cannot create thumbnail: " + e.getMessage()); - } + WMSLoggerFactory.getLogger(null).error("Cannot create thumbnail: " + e.getMessage()); } output.delete(); } + + public void interrupt() { + this.cancel(); + _processSequence.interrupt(); + } }