Skip to content
This repository has been archived by the owner on Mar 19, 2018. It is now read-only.

Commit

Permalink
Merge pull request #4 from njam/wowza-bin-cm
Browse files Browse the repository at this point in the history
Import wowza thumbnails+archive to app with CLI tool
  • Loading branch information
njam committed May 12, 2014
2 parents 5be41d9 + a22d9e0 commit c3c66b7
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 209 deletions.
10 changes: 10 additions & 0 deletions CM-wowza.iml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="lib" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
<root url="file://$MODULE_DIR$/lib" />
</CLASSES>
<JAVADOC />
<SOURCES />
<jarDirectory url="file://$MODULE_DIR$/lib" recursive="false" />
</library>
</orderEntry>
</component>
</module>

35 changes: 32 additions & 3 deletions src/ch/cargomedia/wms/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import ch.cargomedia.wms.stream.Videostream;
import ch.cargomedia.wms.stream.VideostreamList;
import ch.cargomedia.wms.stream.VideostreamPublisher;
import com.wowza.wms.application.IApplicationInstance;
import com.wowza.wms.logging.WMSLoggerFactory;

public class Application {

private static Application _instance;

private Config _config = null;
private IApplicationInstance _appInstance = null;

private VideostreamList<Integer, Videostream> _videostreamList = new VideostreamList<Integer, Videostream>();
private VideostreamList<String, VideostreamPublisher> _videostreamPublisherList = new VideostreamList<String, VideostreamPublisher>();

private Application() {
}

public static Application getInstance() {
if (null == _instance) {
_instance = new Application();
Expand All @@ -29,4 +31,31 @@ public VideostreamList<String, VideostreamPublisher> getVideostreamPublisherList
return this._videostreamPublisherList;
}

public Config getConfig() {
if (null == _config) {
_config = new Config(this.getAppInstance().getProperties());
}
return _config;
}

public String getName() {
return this.getAppInstance().getApplication().getName();
}

public String getStreamStoragePath() {
return this.getAppInstance().getStreamStoragePath();
}

public void setAppInstance(IApplicationInstance appInstance) {
WMSLoggerFactory.getLogger(null).info("setAppInstance: " + appInstance.getName());
this._appInstance = appInstance;
}

public IApplicationInstance getAppInstance() {
if (null == _appInstance) {
throw new RuntimeException("Missing appInstance");
}
return _appInstance;
}

}
37 changes: 32 additions & 5 deletions src/ch/cargomedia/wms/Config.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,40 @@
package ch.cargomedia.wms;

import com.wowza.wms.application.WMSProperties;

public final class Config {
public static final String RPC_UNSUBSCRIBE = "CM_Stream_Video.unsubscribe";
public static final String RPC_UNPUBLISH = "CM_Stream_Video.unpublish";
public static final String RPC_SUBSCRIBE = "CM_Stream_Video.subscribe";
public static final String RPC_PUBLISH = "CM_Stream_Video.publish";
public static final String XMLPROPERTY_THUMBNAIL_AND_ARCHIVE_PATH = "ThumbnailAndArchivePath";
public static final String XMLPROPERTY_THUMBNAIL_WIDTH = "ThumbnailWidth";
public static final Integer BUCKETS_COUNT = 10000;
public static final int THUMBNAILS_INTERVAL = 10000;
public static final int THUMBNAILER_FFMPEG_RETRY_COUNT = 10;

private WMSProperties _properties;

public Config(WMSProperties properties) {
_properties = properties;
}

public String getCmBinPath() {
return this._getPropertyString("cm_bin_path");
}

public Integer getThumbnailWidth() {
return _properties.getPropertyInt("ThumbnailWidth", 240);
}

public Integer getThumbnailInterval() {
return 10000;
}

public String getRpcUrl() {
return this._getPropertyString("RPCUrl");
}

private String _getPropertyString(String key) {
String value = _properties.getPropertyStr(key);
if (null == value || 0 == value.length()) {
throw new RuntimeException("Missing config `" + key + "`.");
}
return value;
}
}
69 changes: 33 additions & 36 deletions src/ch/cargomedia/wms/Utils.java
Original file line number Diff line number Diff line change
@@ -1,52 +1,49 @@
package ch.cargomedia.wms;

import ch.cargomedia.wms.module.eventhandler.ConnectionsListener;
import ch.cargomedia.wms.stream.VideostreamPublisher;
import com.wowza.wms.application.IApplicationInstance;
import com.wowza.wms.stream.IMediaStream;
import org.apache.commons.lang.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.util.UUID;

public class Utils {
public static final int MP4_LIVESTREAM = 0;
public static final int MP4_ARCHIVESTREAM = 1;

public static String[] getArchiveFilePaths(IMediaStream stream, VideostreamPublisher videostreamPublisher) {
String[] files = new String[2];
IApplicationInstance appInstance = ConnectionsListener.appInstance;
int streamId = videostreamPublisher.getStreamId();
String md5Hash = videostreamPublisher.getClientIdMD5Hash();
files[MP4_LIVESTREAM] = appInstance.getStreamStoragePath() + "/" + stream.getName() + ".mp4";
String storageDir = Utils.getStoragePath(videostreamPublisher);
files[MP4_ARCHIVESTREAM] = storageDir + "/" + String.valueOf(streamId) + "-" + md5Hash + "-" + "original" + ".mp4";
return files;
}

public static String getStoragePath(VideostreamPublisher publisher) {
int streamId = publisher.getStreamId();
IApplicationInstance appInstance = ConnectionsListener.appInstance;
String storagePath = appInstance.getProperties().getPropertyStr(Config.XMLPROPERTY_THUMBNAIL_AND_ARCHIVE_PATH) + "/"
+ streamId % Config.BUCKETS_COUNT;
File storageDir = new File(storagePath);
if (!storageDir.exists()) {
storageDir.mkdirs();
public static File getTempFile(String extension) {
String dirPath = System.getProperty("java.io.tmpdir") + "/" + "wowza-cm";
File dir = new File(dirPath);
if (!dir.exists()) {
dir.mkdirs();
}
String filename = UUID.randomUUID().toString();
if (null != extension) {
filename += "." + extension;
}
return appInstance.getProperties().getPropertyStr(Config.XMLPROPERTY_THUMBNAIL_AND_ARCHIVE_PATH) + "/"
+ streamId % Config.BUCKETS_COUNT;
return new File(dir + "/" + filename);
}

public static String getThumbnailStoragePath(VideostreamPublisher publisher) {
int streamId = publisher.getStreamId();
String md5Hash = publisher.getClientIdMD5Hash();
return getStoragePath(publisher) + "/" + String.valueOf(streamId) + "-" + md5Hash + "-thumbs";
public static File getTempFile() {
return getTempFile(null);
}

public static Integer getThumbnailCount(String path) {
File[] thumbnailFiles = new File(path).listFiles();
if (null == thumbnailFiles) {
return 0;
public static String exec(String[] command) throws Exception {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
String output = "";
while ((line = reader.readLine()) != null) {
output += line + "\n";
}
return thumbnailFiles.length;

if (process.waitFor() != 0) {
throw new Exception(String.format("Command exited with code `%s`. \nCommand: %s \nOutput: \n%s",
process.exitValue(), StringUtils.join(command, " "), output));
}

return output;
}

}
1 change: 1 addition & 0 deletions src/ch/cargomedia/wms/http/HTTPConnectionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.io.OutputStream;

@SuppressWarnings("unused")
public class HTTPConnectionStatus extends HTTProvider2Base {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import com.wowza.wms.stream.IMediaStream;


@SuppressWarnings("unused")
public class ConnectionsListener extends ModuleBase implements IModuleOnStream {

public static IApplicationInstance appInstance;

@SuppressWarnings("unused")
public void onAppStart(IApplicationInstance applInstance) {
appInstance = applInstance;
Application.getInstance().setAppInstance(applInstance);
}

@SuppressWarnings("unused")
Expand All @@ -36,7 +35,6 @@ static public void onClientBWCheck(IClient client, RequestFunction function, AMF
sendResult(client, params, statValues);
}


@SuppressWarnings("unused")
public static void onConnect(IClient client, RequestFunction function, AMFDataList params) {
client.getProperties().setProperty("data", params.getString(3));
Expand Down
39 changes: 13 additions & 26 deletions src/ch/cargomedia/wms/module/eventhandler/StreamListener.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package ch.cargomedia.wms.module.eventhandler;

import ch.cargomedia.wms.Application;
import ch.cargomedia.wms.Config;
import ch.cargomedia.wms.Utils;
import ch.cargomedia.wms.rpc.RPC;
import ch.cargomedia.wms.stream.Videostream;
import ch.cargomedia.wms.stream.VideostreamList;
import ch.cargomedia.wms.stream.VideostreamPublisher;
import ch.cargomedia.wms.stream.VideostreamSubscriber;
import ch.cargomedia.wms.transcoder.ThumbnailCount;
import ch.cargomedia.wms.transcoder.Archiver;
import ch.cargomedia.wms.transcoder.Thumbnailer;
import ch.cargomedia.wms.transcoder.Transcoder;
import com.wowza.wms.amf.AMFPacket;
import com.wowza.wms.application.WMSProperties;
import com.wowza.wms.media.model.MediaCodecInfoAudio;
Expand All @@ -24,8 +21,7 @@
public class StreamListener implements IMediaStreamActionNotify3 {

private Videostream _videostream;
private ThumbnailCount _thumbnailCountTask;
private volatile Thumbnailer _thumbnailer;
private Thumbnailer _thumbnailer;

public StreamListener(Videostream videostream) {
this._videostream = videostream;
Expand Down Expand Up @@ -126,14 +122,14 @@ private void _onPublish(IMediaStream stream, MediaCodecInfoVideo mciv) throws Ex
}

RPC rpc = new RPC(stream.getClientId());
int streamId = rpc.getPublishStreamId(videostreamPublisher, stream.getName());
videostreamPublisher.setStreamId(streamId);
String storagePath = Utils.getThumbnailStoragePath(videostreamPublisher);
_thumbnailCountTask = new ThumbnailCount(videostreamPublisher, storagePath);
int streamChannelId = rpc.getPublishStreamId(videostreamPublisher, stream.getName());
videostreamPublisher.setStreamChannelId(streamChannelId);

_thumbnailer = new Thumbnailer(videostreamPublisher, stream);
Integer thumbnailInterval = Application.getInstance().getConfig().getThumbnailInterval();
Timer timer = new Timer();
timer.scheduleAtFixedRate(_thumbnailCountTask, Config.THUMBNAILS_INTERVAL, Config.THUMBNAILS_INTERVAL);
_thumbnailer = new Thumbnailer(videostreamPublisher, stream, storagePath);
_thumbnailer.start();
timer.scheduleAtFixedRate(_thumbnailer, 0, thumbnailInterval);

synchronized (videostreamPublishList) {
videostreamPublishList.put(stream.getName(), videostreamPublisher);
}
Expand All @@ -144,28 +140,19 @@ private void _onUnpublish(IMediaStream stream) throws Exception {
VideostreamPublisher videostreamPublisher = videostreamPublishList.get(stream.getName());

if (_thumbnailer != null) {
_thumbnailer.killRunningProcess();
}

if (_thumbnailCountTask != null) {
_thumbnailCountTask.shutdown();
_thumbnailer.cancel();
}

if (videostreamPublisher != null) {
synchronized (videostreamPublishList) {
videostreamPublishList.remove(videostreamPublisher.getStreamName());
}

Integer thumbnailCount = Utils.getThumbnailCount(Utils.getThumbnailStoragePath(videostreamPublisher));
videostreamPublisher.setThumbnailCount(thumbnailCount);

RPC rpc = new RPC(stream.getClient().getClientId());
rpc.notifyUnpublish(stream.getName(), videostreamPublisher.getThumbnailCount());

rpc.notifyUnpublish(stream.getName());

String[] files = Utils.getArchiveFilePaths(stream, videostreamPublisher);
Transcoder finalTranscoder = new Transcoder(files[Utils.MP4_LIVESTREAM], files[Utils.MP4_ARCHIVESTREAM]);
finalTranscoder.start();
Archiver transcoder = new Archiver(videostreamPublisher);
transcoder.start();
}
}

Expand Down
11 changes: 4 additions & 7 deletions src/ch/cargomedia/wms/rpc/RPC.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package ch.cargomedia.wms.rpc;

import ch.cargomedia.wms.Application;
import ch.cargomedia.wms.Config;
import ch.cargomedia.wms.module.eventhandler.ConnectionsListener;
import ch.cargomedia.wms.stream.VideostreamPublisher;
import ch.cargomedia.wms.stream.VideostreamSubscriber;
import com.wowza.wms.application.WMSProperties;
import com.wowza.wms.logging.WMSLoggerFactory;
import flexjson.JSONSerializer;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
Expand All @@ -22,9 +21,9 @@ public class RPC {
private Integer clientId;

public RPC(Integer clientId) {
WMSProperties streamProperties = ConnectionsListener.appInstance.getProperties();
Application application = Application.getInstance();
this.clientId = clientId;
this.rpcUrl = streamProperties.getPropertyStr("RPCUrl");
this.rpcUrl = application.getConfig().getRpcUrl();
}

public int getPublishStreamId(VideostreamPublisher videostream, String streamKey) throws Exception {
Expand All @@ -34,7 +33,6 @@ public int getPublishStreamId(VideostreamPublisher videostream, String streamKey
params.add(videostream.getStartTimestamp());
params.add(videostream.getWidth());
params.add(videostream.getHeight());
params.add(videostream.getThumbnailCount());
params.add(videostream.getData());

String channelId;
Expand Down Expand Up @@ -63,10 +61,9 @@ public void notifyUnsubscribe(String streamKey) throws Exception {
getPostHttp(Config.RPC_UNSUBSCRIBE, params);
}

public void notifyUnpublish(String streamKey, Integer thumbnailCount) throws Exception {
public void notifyUnpublish(String streamKey) throws Exception {
Vector<Object> params = new Vector<Object>();
params.add(streamKey);
params.add(thumbnailCount);
params.add(String.valueOf(this.clientId));
getPostHttp(Config.RPC_UNPUBLISH, params);
}
Expand Down
Loading

0 comments on commit c3c66b7

Please sign in to comment.