Skip to content
This repository has been archived by the owner on Mar 19, 2018. It is now read-only.

Commit

Permalink
Merge pull request #5 from njam/issue-5
Browse files Browse the repository at this point in the history
Kill thumbnailer on mediaStream close
  • Loading branch information
njam committed May 14, 2014
2 parents c3c66b7 + 21e82ba commit e1a820d
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 38 deletions.
24 changes: 0 additions & 24 deletions src/ch/cargomedia/wms/Utils.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package ch.cargomedia.wms;

import org.apache.commons.lang.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.util.UUID;

public class Utils {
Expand All @@ -26,24 +22,4 @@ public static File getTempFile() {
return getTempFile(null);
}

public static String exec(String[] command) throws Exception {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
String output = "";
while ((line = reader.readLine()) != null) {
output += line + "\n";
}

if (process.waitFor() != 0) {
throw new Exception(String.format("Command exited with code `%s`. \nCommand: %s \nOutput: \n%s",
process.exitValue(), StringUtils.join(command, " "), output));
}

return output;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void _onUnpublish(IMediaStream stream) throws Exception {
VideostreamPublisher videostreamPublisher = videostreamPublishList.get(stream.getName());

if (_thumbnailer != null) {
_thumbnailer.cancel();
_thumbnailer.interrupt();
}

if (videostreamPublisher != null) {
Expand Down
86 changes: 86 additions & 0 deletions src/ch/cargomedia/wms/process/InterruptibleProcess.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package ch.cargomedia.wms.process;

import com.wowza.wms.logging.WMSLoggerFactory;
import org.apache.commons.lang.StringUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class InterruptibleProcess extends Thread {

private String[] _command;
private BufferedReader _processReader = null;
private Boolean _interrupted = false;

private String _output = null;
private Integer _exitCode = null;
private Exception _exception = null;

public InterruptibleProcess(String[] command) {
_command = command;
}

public void run() {
try {
ProcessBuilder builder = new ProcessBuilder(_command);
builder.redirectErrorStream(true);
Process _process = builder.start();

_processReader = new BufferedReader(new InputStreamReader(_process.getInputStream()));
String line;
_output = "";
while ((line = _processReader.readLine()) != null) {
_output += line + "\n";
}

if (_interrupted) {
throw new InterruptedException();
}

_exitCode = _process.waitFor();
if (0 != _exitCode) {
throw new Exception(String.format("Command exited with code `%s`. \nCommand: %s \nOutput: \n%s",
_process.exitValue(), StringUtils.join(_command, " "), _output));
}
} catch (Exception e) {
_exception = e;
}
}

@Override
public void interrupt() {
_interrupted = true;
this._closeProcessReader();
super.interrupt();
}

public String getOutput() {
return _output;
}

public Integer getExitCode() {
return _exitCode;
}

public Exception getException() {
return _exception;
}

public void throwExceptionIfAny() throws Exception {
if (null != _exception) {
throw _exception;
}
}

private void _closeProcessReader() {
if (null != _processReader) {
try {
_processReader.close();
_processReader = null;
} catch (IOException e) {
WMSLoggerFactory.getLogger(null).error("Cannot close process reader: " + e.getMessage());
}
}
}
}
35 changes: 35 additions & 0 deletions src/ch/cargomedia/wms/process/ProcessSequence.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ch.cargomedia.wms.process;

import java.util.ArrayList;

public class ProcessSequence {

private ArrayList<String[]> _commandList = new ArrayList<String[]>();
private InterruptibleProcess _processWorker = null;
private Boolean _interrupted = false;

public void addCommand(String[] command) {
_commandList.add(command);
}

public void runAll() throws Exception {
for (String[] command : _commandList) {
if (_interrupted) {
throw new InterruptedException();
}
_processWorker = new InterruptibleProcess(command);
_processWorker.run();
_processWorker.join(0);
_processWorker.throwExceptionIfAny();
_processWorker = null;
}
_commandList.clear();
}

public void interrupt() {
_interrupted = true;
if (null != _processWorker) {
_processWorker.interrupt();
}
}
}
10 changes: 6 additions & 4 deletions src/ch/cargomedia/wms/transcoder/Archiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import ch.cargomedia.wms.Application;
import ch.cargomedia.wms.Utils;
import ch.cargomedia.wms.process.ProcessSequence;
import ch.cargomedia.wms.stream.VideostreamPublisher;
import com.wowza.wms.logging.WMSLoggerFactory;

import java.io.File;

public class Archiver extends Thread {

private ProcessSequence _processSequence = new ProcessSequence();

private VideostreamPublisher _stream;
private File _input;
private String _pathBinCm;
Expand All @@ -23,8 +26,7 @@ public Archiver(VideostreamPublisher stream) {
public void run() {
File output = Utils.getTempFile("mp4");
try {

Utils.exec(new String[]{
_processSequence.addCommand(new String[]{
"ffmpeg",
"-threads", "1",
"-i", _input.getAbsolutePath(),
Expand All @@ -35,14 +37,14 @@ public void run() {
"-loglevel", "warning",
output.getAbsolutePath(),
});

Utils.exec(new String[]{
_processSequence.addCommand(new String[]{
_pathBinCm,
"stream",
"import-video-archive",
String.valueOf(_stream.getStreamChannelId()),
output.getAbsolutePath(),
});
_processSequence.runAll();

} catch (Exception e) {
WMSLoggerFactory.getLogger(null).error("Cannot create archive: " + e.getMessage());
Expand Down
24 changes: 15 additions & 9 deletions src/ch/cargomedia/wms/transcoder/Thumbnailer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ch.cargomedia.wms.Application;
import ch.cargomedia.wms.Utils;
import ch.cargomedia.wms.process.ProcessSequence;
import ch.cargomedia.wms.stream.VideostreamPublisher;
import com.wowza.wms.logging.WMSLoggerFactory;
import com.wowza.wms.stream.IMediaStream;
Expand All @@ -11,17 +12,18 @@

public class Thumbnailer extends TimerTask {

private ProcessSequence _processSequence = new ProcessSequence();

private VideostreamPublisher _stream;
private String _input;
private String _pathBinCm;
private int _width;
private int _height;
private IMediaStream _mediaStream;


public Thumbnailer(VideostreamPublisher stream, IMediaStream mediaStream) {
Application application = Application.getInstance();
_stream = stream;
_mediaStream = mediaStream;
_input = "rtmp://127.0.0.1/" + application.getName() + "/" + stream.getStreamName();
_pathBinCm = application.getConfig().getCmBinPath();
_width = application.getConfig().getThumbnailWidth();
Expand All @@ -31,8 +33,7 @@ public Thumbnailer(VideostreamPublisher stream, IMediaStream mediaStream) {
public void run() {
File output = Utils.getTempFile("png");
try {

Utils.exec(new String[]{
_processSequence.addCommand(new String[]{
"ffmpeg",
"-threads", "1",
"-i", _input,
Expand All @@ -45,20 +46,25 @@ public void run() {
"-loglevel", "warning",
output.getAbsolutePath(),
});

Utils.exec(new String[]{
_processSequence.addCommand(new String[]{
_pathBinCm,
"stream",
"import-video-thumbnail",
String.valueOf(_stream.getStreamChannelId()),
output.getAbsolutePath(),
});
_processSequence.runAll();

} catch (InterruptedException e) {
WMSLoggerFactory.getLogger(null).info("Thumbnail creation interrupted.");
} catch (Exception e) {
if (_mediaStream.isOpen()) {
WMSLoggerFactory.getLogger(null).error("Cannot create thumbnail: " + e.getMessage());
}
WMSLoggerFactory.getLogger(null).error("Cannot create thumbnail: " + e.getMessage());
}
output.delete();
}

public void interrupt() {
this.cancel();
_processSequence.interrupt();
}
}

0 comments on commit e1a820d

Please sign in to comment.