-
Notifications
You must be signed in to change notification settings - Fork 223
Scalability and load balancing
Even if RTMFP is a P2P protocol, RTMFP has need to have a server end-point to negotiate the P2P connection between both clients. Then, RTMFP can be used just for its UDP based abilities, like real-time video/audio streaming, in a classical client-server way (without using P2P feature). For these reasons, server load is always applied and central availability required. Of course, one machine has always some hardware limitation (CPU/memory), and when load required becomes too important, a CumulusServer instance can not be enough.
To solve this loading requirement, a full framework included in CumuluServer allows to make communicate multiple CumulusServer instances, to enlarge computing and receiving capacity. It allows to configure a multiple CumulusServer environment, it offers detection of server connection and disconnection, exchange of data between them, redirection of clients between servers to deploy a load-balacing software system for example, and some well-thought features to synchronise client informations for the rendez-vous service and the NetGroup RTMFP options. Every communication between servers is done in a raw TCP way.
The main idea is simple: by default, each instance is an independant server and share nothing with others, it's you who decides what are the resources that it has to share between all the server instances.
This page intends to describe every features of this framework illustrated with some code samples and context usage. Of course, the Server Application, API page lists all these feature but without code samples or any utilization context.
Finally some piece of script code illustrates as uses it, to know how create a application server see Server Application page.
Firstly to make communicate many instances of CumulusServer, you have to configure them. The three following configurations allows to make working the multiple servers mode:
- publicAddress to configure the public address server to make working the client redirections.
- servers.port to configure the port to receive incoming server connections.
- servers.targets to configure the addresses of remote CumulusServer instances trying to join.
Here follows an illustration of one configuration with two servers:
![CumulusServers](https://github.com/downloads/OpenRTMFP/Cumulus/CumulusServers.png)Warning
Exchange between servers is done in a uncrypted TCP way, so to avoid an attack by the incoming port of B, its servers.port configured should be protected by a firewall to allow just a connection by an other server and nothing else.
A initializes here the connection to B (server.targets configured). A sees B as a target:
-- Server application on A side
function onServerConnection(server)
if server.isTarget then
NOTE("Target gotten : "..server.address.."("..server.publicAddress.." for clients)"
-- displays "Target gotten : 192.168.0.2 (www.hostB.com for clients)"
end
end
B, who has a incoming port configured (1936), accepts the connection of A. B sees A as an initiator:
-- Server application on B side
function onServerConnection(server)
NOTE(server.isTarget) -- displays "false"
end
Warning
If server A and B configures each other as its target, the two TCP connections will be created, causing confusion in server exchange:
This configuration system allows to scale an existing system horizontaly without having to restart server already running. Indeed, the first server started can configure its incoming server port (servers.port) and no target, and a new server can come to extend the system in putting the address of the first server in its servers.targets configuration.
Of course, complex configurations are possible, with multiple server (and properties individual by server, see Configurations part of Installation page):
;CumulusServer.ini
publicAddress = www.myhost.com:1935
[servers]
targets = 192.168.0.2:1936?type=master;192.168.0.3:1936
function onServerConnection(server)
if server.type=="master" then -- true here just for 192.168.0.2:1936 server
NOTE("Master server connected")
end
end
function onServerDisconnection(server)
if server.type=="master" then -- true here just for 192.168.0.2:1936 server
NOTE("Master server disconnected")
end
end
To exchange data between servers you have to call the server:send method on sender side (see server object description on Server Application API) and you have to define RPC server functions as a member of server object on the receiver side:
function onServerConnection(server)
-- RPC function declaration, to receive data from one other server
function server:onHello(name)
self.name = name
end
-- send my name to the incoming server (it will receive it on its "onHello" method)
server:send("onHello","CumulusServer A")
end
-- now you can find the name of each server everywhere
for index,server in cumulus.servers:ipairs() do
NOTE("Server '"..server.name.."' at address "..server.address)
end
Warning
self.name = name in the function body of onHello creates on the server object a name value. Beware with this kind of thing on server object, it's shared with all other Server Application. If one other server application attachs too a name value to this server object, it will overload the previous assignment. A solution can be to prefix the property by the name of the current application.
-- following server (horizontal scaling)
_server = nil
-- number of subscribers (listeners) for this server
_subscribers = 0
function onServerConnection(server)
if server.isTarget then
-- incoming server is a following server!
if _server then error("following server already connected") end
_server = server
-- informs the following server about my publications
for id,publication in cumulus.publications:pairs() do
_server:send("publish",publication.name)
end
else
-- incoming server is a previous server, we have to create RPC function to receive
-- its publication informations
server.publications = {}
function server:publish(name)
-- publication creation
self.publications[name] = cumulus:publish(name)
end
function server:unpublish(name)
-- publication suppression
local publication = self.publications[name]
if publication then publication:close() end
self.publications[name] = nil
end
function server:video(name,time,packet)
local publication = self.publications[name]
-- give the video packet to our publication copy
if publication then publication:pushVideoPacket(time,packet) end
end
function server:audio(name,time,packet)
local publication = self.publications[name]
-- give the audio packet to our publication copy
if publication then publication:pushVideoPacket(time,packet) end
end
function server:data(name,time,packet)
local publication = self.publications[name]
-- give the data packet to our publication copy
if publication then publication:pushVideoPacket(time,packet) end
end
end
end
function onServerDisconnection(server)
if server.isTarget then
-- disconnected server was a following server!
_server = nil
return
end
-- disconnected server was a previous server, close its publications
for id,publication in server.publications do
publication:close()
end
end
function onPublish(client,publication)
-- informs the following server about this publication
if _server then _server:send("publish",publication.name) end
end
function onUnpublish(client,publication)
-- informs the following server about this unpublication
if _server then _server:send("unpublish",publication.name) end
end
function onSubscribe(client,listener)
-- if a following server exist, and if this server has more than 400 subscribers
-- redirect the client to the following server
if _server and _subscribers>=400 then
client.writer:writeAMFMessage("redirect",_server.publicAddress)
client.writer:close()
else
_subscribers = _subscribers + 1
end
end
function onUnsubscribe(client,listener)
_subscribers = _subscribers - 1
end
function onVideoPacket(client,publication,time,packet)
if not _server then return end
-- forward the video packet to the following server
_server:send("video",publication.name,time,packet)
end
function onAudioPacket(client,publication,time,packet)
if not _server then return end
-- forward the audio packet to the following server
_server:send("audio",publication.name,time,packet)
end
function onDataPacket(client,publication,name,packet)
if not _server then return end
-- forward the data packet to the following server
_server:send("data",publication.name,time,packet)
end
In a load-balacing solution, usually we opt for hardware solution with a DNS which returns an address ip rotated on a list of addresses. You can realize it in a software way in using the onHandshake event (see Server Application, API page for complete details on this event):
-- index incremented to redirect client equally to each server
index=0
function onHandshake(address,path,properties,attempts)
index=index+1
if index>cumulus.servers.count then index=1 end -- not exceed the number of server available
return cumulus.servers(index) -- load-balacing system!
end
Here the server doesn't accept any connection client, it redirects the cleint in handshake performing. There is no real benefits comparing with a hardware solution. An other possibility is of returning many server addresses to benefit of parallel connection behavior of RTMFP protocol.
function onHandshake(address,path,properties,attempts)
return cumulus.servers
end
Indeed, the client will receive multiple server addresses, and in this case, RTMFP starts multiple connection attempt in parallel, and keep only the faster to answer. It's an other way of load-balacing system: the more faster wins.
About the P2P rendezvous service of Cumulus, in a multiple servers way, if the peerA connected to CumulusServerA requests a connection to the peerB connected to CumulusServerB, of course CumulusServerA will be unable to return information about peerB. We have to use the onRendezVousUnknown event (see Server Application, API page for complete details on this event):
function onRendezVousUnknown(peerId)
return cumulus.servers -- redirect to all the connected servers
end
With the above code addition, you can redirect a rendezvous request which fails to other servers.
But it's always missing a solution to synchronize member of groups in NetGroup usage case. Indeed, a groupA can exists on serverA and contains peerA, and the same groupA can exists on serverB too and contains peerB. peerB and peerA will never meet them. To solve it, you have to use "groups:join" method (see "groups" object description on Server Application, API page for complete description of this method). The idea is simple: you have to share every group inclusion informations between all servers. The following server application code realizes this sharing job:
function onRendezVousUnknown(peerId)
return cumulus.servers -- redirect to all the connected servers
end
function onJoinGroup(client,group)
-- inform other servers of this joining operation
cumulus.servers:broadcast("join",group.rawId,client.rawId)
end
function onUnjoinGroup(client,group)
-- inform other servers of this unjoining operation
cumulus.servers:broadcast("unjoin",group.rawId,client.rawId)
end
function onServerConnection(server)
-- inform this new incoming server of my group/client relations existing
for id,group in cumulus.groups:pairs() do
for i,client in cumulus.groups:ipairs() do
server:send("join",group.rawId,client.rawId)
end
end
server.groups = {}
-- RPC server functions to receive joining/unjoining operation
function server:join(groupId,clientId)
-- creation of a virtual member for this group
local member = cumulus:join(groupId,clientId)
if not member then return end -- join operation has failed
-- We have to attach this member object to its server
-- to avoid its destruction by the LUA garbage collector
local group = self.groups[groupId]
if not group then self.groups[groupId] = {size=0} end
group.size = group.size + 1
group[clientId] = member
end
function server:unjoin(groupId,clientId)
-- suppression of a possible virtual member of group
if not group then return end
local member = group[clientId]
if member then
member:release() -- detach of its group
group[clientId] = nil
group.size = group.size - 1
end
-- erase the group object if it's empty now
if group.size==0 then self.groups[groupId]=nil end
end
end
function onServerDisconnection(server)
-- suppression of possible virtual members attached to this server
for id,group in pairs(server.groups) do
for id,member in pairs(group) do
if id ~= "size" then member:release() end
end
end
end