-
Notifications
You must be signed in to change notification settings - Fork 223
Scalability and load balancing
Even if RTMFP is a P2P protocol, RTMFP has need to have a server end-point to negotiate the P2P connection between both clients. Then, RTMFP can be used just for its UDP based abilities, like real-time video/audio streaming, in a classical client-server way (without using P2P feature). For these reasons, server load is always applied and central availability required. Of course, one machine has always some hardware limitation (CPU/memory), and when load required becomes too important, a CumulusServer instance can not be enough.
To solve this loading requirement, a full framework included in CumuluServer allows to make communicate multiple CumulusServer instances, to enlarge computing and receiving capacity. It allows to configure a multiple CumulusServer environment, it offers detection of server connection and disconnection, exchange of data between them, redirection of clients between servers to deploy a load-balacing software system for example, and some well-thought features to synchronise client informations for the rendez-vous service and the NetGroup RTMFP options. Every communication between servers is done in a raw TCP way.
The main idea is simple: by default, each instance is an independant server and share nothing with others, it's you who decides what are the resources that it has to share between all the server instances.
This page intends to describe every features of this framework illustrated with some code samples and context usage. Of course, the Server Application, API page lists all these feature but without code samples or any utilization context.
Finally some piece of script code illustrates as uses it, to know how create a application server see Server Application page.
Firstly to make communicate many instances of CumulusServer, you have to configure them. The three following configurations allows to make working the multiple servers mode:
- publicAddress to configure the public address server to make working the client redirections.
- servers.port to configure the port to receive incoming server connections.
- servers.targets to configure the addresses of remote CumulusServer instances trying to join.
Here follows an illustration of one configuration with two servers:
![CumulusServers](https://github.com/downloads/OpenRTMFP/Cumulus/CumulusServers.png)Warning
Exchange between servers is done in a uncrypted TCP way, so to avoid an attack by the incoming port of B, its servers.port configured should be protected by a firewall to allow just a connection by an other server and nothing else.
A initializes here the connection to B (server.targets configured). A sees B as a target:
-- Server application on A side
function onServerConnection(server)
if server.isTarget then
NOTE("Target gotten : "..server.address.."("..server.publicAddress.." for clients)"
-- displays "Target gotten : 192.168.0.2 (www.hostB.com for clients)"
end
end
B, who has a incoming port configured (1936), accepts the connection of A. B sees A as an initiator:
-- Server application on B side
function onServerConnection(server)
NOTE(server.isTarget) -- displays "false"
end
Warning
If server A and B configures each other as its target, the two TCP connections will be created, causing confusion in server exchange:
This configuration system allows to scale an existing system horizontaly without having to restart server already running. Indeed, the first server started can configure its incoming server port (servers.port) and no target, and a new server can come to extend the system in putting the address of the first server in its servers.targets configuration.
Of course, complex configurations are possible, with multiple server (and properties individual by server, see Configurations part of Installation page):
;CumulusServer.ini
publicAddress = www.myhost.com:1935
[servers]
targets = 192.168.0.2:1936?type=master;192.168.0.3:1936
function onServerConnection(server)
if server.type=="master" then -- true here just for 192.168.0.2:1936 server
NOTE("Master server connected")
end
end
function onServerDisconnection(server)
if server.type=="master" then -- true here just for 192.168.0.2:1936 server
NOTE("Master server disconnected")
end
end
function onServerConnection(server)
-- RPC function declaration, called by one other server
function server:onHello(name)
self.name = name
end
-- send my name to this server (it will receive it on its "onHello" method)
server:send("onHello","CumulusServer A")
end
-- now you can find the name of each server everywhere
for index,server in cumulus.servers:ipairs() do
NOTE("Server '"..server.name.."' at address "..server.address)
end
Warning
server object properties (share in all applications).
-- following server (horizontal scaling)
_server = nil
-- number of subscribers (listeners) for this server
_subscribers = 0
function onServerConnection(server)
if server.isTarget then
-- incoming server is a following server!
if _server then error("following server already connected") end
_server = server
-- informs the following server about my publications
for id,publication in cumulus.publications:pairs() do
_server:send("publish",publication.name)
end
else
-- incoming server is a previous server, we have to create RPC function to receive
-- its publication informations
server.publications = {}
function server:publish(name)
-- publication creation
self.publications[name] = cumulus:publish(name)
end
function server:unpublish(name)
-- publication suppression
local publication = self.publications[name]
if publication then publication:close() end
self.publications[name] = nil
end
function server:video(name,time,packet)
local publication = self.publications[name]
-- give the video packet to our publication copy
if publication then publication:pushVideoPacket(time,packet) end
end
function server:audio(name,time,packet)
local publication = self.publications[name]
-- give the audio packet to our publication copy
if publication then publication:pushVideoPacket(time,packet) end
end
function server:data(name,time,packet)
local publication = self.publications[name]
-- give the data packet to our publication copy
if publication then publication:pushVideoPacket(time,packet) end
end
end
end
function onServerDisconnection(server)
if server.isTarget then
-- disconnected server was a following server!
_server = nil
return
end
-- disconnected server was a previous server, close its publications
for id,publication in server.publications do
publication:close()
end
end
function onPublish(client,publication)
-- informs the following server about this publication
if _server then _server:send("publish",publication.name) end
end
function onUnpublish(client,publication)
-- informs the following server about this unpublication
if _server then _server:send("unpublish",publication.name) end
end
function onSubscribe(client,listener)
-- if a following server exist, and if this server has more than 400 subscribers
-- redirect the client to the following server
if _server and _subscribers>=400 then
client.writer:writeAMFMessage("redirect",_server.publicAddress)
client.writer:close()
else
_subscribers = _subscribers + 1
end
end
function onUnsubscribe(client,listener)
_subscribers = _subscribers - 1
end
function onVideoPacket(client,publication,time,packet)
if not _server then return end
-- forward the video packet to the following server
_server:send("video",publication.name,time,packet)
end
function onAudioPacket(client,publication,time,packet)
if not _server then return end
-- forward the audio packet to the following server
_server:send("audio",publication.name,time,packet)
end
function onDataPacket(client,publication,name,packet)
if not _server then return end
-- forward the data packet to the following server
_server:send("data",publication.name,time,packet)
end
Load balancing
In a load-balacing solution, usually we opt for hardware solution with a DNS which returns an address ip rotated on a list of addresses. You can realize it in a software way in using the onHandshake event (see Server Application, API page for more details on this event):
-- index incremented to redirect client equally to each server
index=0
function onHandshake(address,path,properties,attempts)
index=index+1
if index>cumulus.servers.count then index=1 end -- not exceed the number of server available
return cumulus.servers(index) -- load-balacing system!
end
Here the server doesn't accept any connection client, it redirects tje cleint in handshake performing. There is no real benefits comparing with a hardware solution. An other possibility is of returning many server addresses to benefit of parallel connection behavior of RTMFP protocol.
function onHandshake(address,path,properties,attempts)
return cumulus.servers
end
Indeed, the client will receive multiple server addresses, and in this case, RTMFP starts multiple connection attempt in parallel, and keep only the faster to answer. It's an other way of load-balacing system: the more faster wins.