Skip to content

Scalability and load balancing

cumulusdev edited this page Nov 30, 2012 · 27 revisions

Scalability and load-balancing

Presentation

Even if RTMFP is a P2P protocol, RTMFP has need to have a server end-point to negotiate the P2P connection between both clients. Then, RTMFP can be used just for its UDP based abilities, like real-time video/audio streaming, in a classical client-server way (without using P2P feature). For these reasons, server load is always applied and central availability required. Of course, one machine has always some hardware limitation (CPU/memory), and when load required becomes too important, a CumulusServer instance can not be enough.

To solve this loading requirement, a full framework included in CumuluServer allows to make communicate multiple CumulusServer instances, to enlarge computing and receiving capacity. It allows to configure a multiple CumulusServer environment, it offers detection of server connection and disconnection, exchange of data between them, redirection of clients between servers to deploy a load-balacing software system for example, and some well-thought features to synchronise client informations for the rendez-vous service and the NetGroup RTMFP options. Every communication between servers is done in a raw TCP way.

The main idea is simple: by default, each instance is an independant server and share nothing with others, it's you who decides what are the resources that it has to share between all the server instances.

This page intends to describe every features of this framework illustrated with some code samples and context usage. Of course, the Server Application, API page lists all these feature but without code samples or any utilization context.

Finally some piece of script code illustrates as uses it, to know how create a application server see Server Application page.

Configurations

Firstly to make communicate many instances of CumulusServer, you have to configure them. The three following configurations allows to make working the multiple servers mode:

  • publicAddress to configure the public address server to make working the client redirections.
  • servers.port to configure the port to receive incoming server connections.
  • servers.targets to configure the addresses of remote CumulusServer instances trying to join.

Here follows an illustration of one configuration with two servers:

![CumulusServers](https://github.com/downloads/OpenRTMFP/Cumulus/CumulusServers.png)

Warning Exchange between servers is done in a uncrypted TCP way, so to avoid an attack by the incoming port of B, its servers.port configured should be protected by a firewall to allow just a connection by an other server and nothing else.

A initializes here the connection to B (server.targets configured). A sees B as a target:

	-- Server application on A side
	function onServerConnection(server)
		if server.isTarget then
			NOTE("Target gotten : "..server.address.."("..server.publicAddress.." for clients)"
			-- displays "Target gotten : 192.168.0.2 (www.hostB.com for clients)"
		end
	end

B, who has a incoming port configured (1936), accepts the connection of A. B sees A as an initiator:

	-- Server application on B side
	function onServerConnection(server)
		NOTE(server.isTarget) -- displays "false"
	end

Warning If server A and B configures each other as its target, the two TCP connections will be created, causing confusion in server exchange:

![CumulusServers_DoubleConnection](https://github.com/downloads/OpenRTMFP/Cumulus/CumulusServers_DoubleConnection.png)

This configuration system allows to scale an existing system horizontaly without having to restart server already running. Indeed, the first server started can configure its incoming server port (servers.port) and no target, and a new server can come to extend the system in putting the address of the first server in its servers.targets configuration.

Of course, complex configurations are possible, with multiple server (and properties individual by server, see Configurations part of Installation page):

	;CumulusServer.ini
	publicAddress = www.myhost.com:1935
	[servers]
	targets = 192.168.0.2:1936?type=master;192.168.0.3:1936
	function onServerConnection(server)
		if server.type=="master" then -- true here just for 192.168.0.2:1936 server
			NOTE("Master server connected")
		end
	end
	function onServerDisconnection(server)
		if server.type=="master" then -- true here just for 192.168.0.2:1936 server
			NOTE("Master server disconnected")
		end
	end

Exchange data and resources

	function onServerConnection(server)
		-- RPC function declaration, called by one other server
		function server:onHello(name)
			self.name = name
		end
		-- send my name to this server (it will receive it on its "onHello" method)
		server:send("onHello","CumulusServer A")
	end

	-- now you can find the name of each server everywhere
	for index,server in cumulus.servers:ipairs() do
		NOTE("Server '"..server.name.."' at address "..server.address)
	end

Warning server object properties (share in all applications).

![CumulusServers_Publication](https://github.com/downloads/OpenRTMFP/Cumulus/CumulusServers_Publication.png)
	-- following server (horizontal scaling)
	_server = nil

	-- number of subscribers (listeners) for this server
	_subscribers = 0

	function onServerConnection(server)
		if server.isTarget then
			-- incoming server is a following server!
			if _server then error("following server already connected") end
			_server = server
			-- informs the following server about my publications
			for id,publication in cumulus.publications:pairs() do
				_server:send("publish",publication.name)
			end
		else
			-- incoming server is a previous server, we have to create RPC function to receive
			-- its publication informations
			server.publications = {}
			function server:publish(name)
				-- publication creation
				self.publications[name] = cumulus:publish(name)
			end
			function server:unpublish(name)
				-- publication suppression
				local publication = self.publications[name]
				if publication then publication:close() end
				self.publications[name] = nil
			end
			function server:video(name,time,packet)
				local publication = self.publications[name]
				-- give the video packet to our publication copy
				if publication then publication:pushVideoPacket(time,packet) end
			end
			function server:audio(name,time,packet)
				local publication = self.publications[name]
				-- give the audio packet to our publication copy
				if publication then publication:pushVideoPacket(time,packet) end
			end
			function server:data(name,time,packet)
				local publication = self.publications[name]
				-- give the data packet to our publication copy
				if publication then publication:pushVideoPacket(time,packet) end
			end
		end
	end

	function onServerDisconnection(server)
		if server.isTarget then
			-- disconnected server was a following server!
			_server = nil
			return
		end
		-- disconnected server was a previous server, close its publications
		for id,publication in server.publications do
			publication:close()
		end
	end

	function onPublish(client,publication)
		-- informs the following server about this publication
		if _server then _server:send("publish",publication.name) end
	end

	function onUnpublish(client,publication)
		-- informs the following server about this unpublication
		if _server then _server:send("unpublish",publication.name) end
	end

	function onSubscribe(client,listener)
		-- if a following server exist, and if this server has more than 400 subscribers
		-- redirect the client to the following server
		if _server and _subscribers>=400 then
			client.writer:writeAMFMessage("redirect",_server.publicAddress)
			client.writer:close()
		else
			_subscribers = _subscribers + 1
		end
	end

	function onUnsubscribe(client,listener)
		_subscribers = _subscribers - 1
	end
	
	function onVideoPacket(client,publication,time,packet)
		if not _server then return end
		-- forward the video packet to the following server
		_server:send("video",publication.name,time,packet)
	end

	function onAudioPacket(client,publication,time,packet)
		if not _server then return end
		-- forward the audio packet to the following server
		_server:send("audio",publication.name,time,packet)
	end

	function onDataPacket(client,publication,name,packet)
		if not _server then return end
		-- forward the data packet to the following server
		_server:send("data",publication.name,time,packet)
	end

Load balancing and rendezvous service

Load balancing

In a load-balacing solution, usually we opt for hardware solution with a DNS which returns an address ip rotated on a list of addresses. You can realize it in a software way in using the onHandshake event (see Server Application, API page for more details on this event):

	-- index incremented to redirect client equally to each server
	index=0
	function onHandshake(address,path,properties,attempts)
		index=index+1
		if index>cumulus.servers.count then index=1 end -- not exceed the number of server available
		return cumulus.servers(index) -- load-balacing system!
	end

Here the server doesn't accept any connection client, it redirects tje cleint in handshake performing. There is no real benefits comparing with a hardware solution. An other possibility is of returning many server addresses to benefit of parallel connection behavior of RTMFP protocol.

	function onHandshake(address,path,properties,attempts)
		return cumulus.servers
	end

Indeed, the client will receive multiple server addresses, and in this case, RTMFP starts multiple connection attempt in parallel, and keep only the faster to answer. It's an other way of load-balacing system: the more faster wins.

Clone this wiki locally