2014-02-27 09:04:09 +11:00
var net = require ( 'net' ) ,
fs = require ( 'fs' ) ,
dgram = require ( 'dgram' ) ,
2014-02-22 20:13:31 +11:00
eventParser = require ( '../lib/eventParser.js' ) ,
2014-02-27 09:04:09 +11:00
pubsub = require ( 'event-pubsub' ) ;
2014-02-22 20:13:31 +11:00
function emit ( socket , type , data ) {
if ( ! data )
data = false ;
this . log ( 'dispatching event to socket' . debug , ' : ' , type . data , data ) ;
2014-02-27 09:04:09 +11:00
var event = {
type : type ,
data : data
}
if ( this . udp4 || this . udp6 ) {
if ( ! socket . address || ! socket . port ) {
this . log ( 'Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets' ) ;
this . broadcast ( type , data ) ;
return ;
}
this . server . write (
eventParser . format (
event
) ,
socket
)
return ;
} ;
2014-02-22 20:13:31 +11:00
socket . write (
eventParser . format (
2014-02-27 09:04:09 +11:00
event
2014-02-22 20:13:31 +11:00
)
) ;
} ;
function broadcast ( type , data ) {
2014-02-27 09:04:09 +11:00
this . log ( 'broadcasting event to all known sockets listening to ' . debug , this . path . variable , ' : ' , ( ( this . port ) ? this . port : '' ) , type . data , data ) ;
2014-02-22 20:13:31 +11:00
if ( ! data )
data = false ;
var e = eventParser . format (
{
type : type ,
data : data
}
) ;
2014-02-27 09:04:09 +11:00
if ( this . udp4 || this . udp6 ) {
for ( var i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
this . server . write ( e , this . sockets [ i ] ) ;
}
} else {
for ( var i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
this . sockets [ i ] . write ( e ) ;
}
2014-02-22 20:13:31 +11:00
}
} ;
2014-02-26 12:15:43 +11:00
function init ( path , config , log , port ) {
2014-02-22 20:13:31 +11:00
var server = {
config : config ,
path : path ,
2014-02-26 12:15:43 +11:00
port : port ,
2014-02-27 09:04:09 +11:00
udp4 : false ,
udp6 : false ,
2014-02-22 20:13:31 +11:00
log : log ,
server : false ,
sockets : [ ] ,
emit : emit ,
broadcast : broadcast ,
define : {
listen : {
'get.events.broadcasting' : 'does not require any special paramaters' ,
'get.events.listening' : 'does not require any special paramaters'
} ,
broadcast : {
'events.broadcasting' : 'data.events is a JSON object of event definitions by type ' + config . id + ' will broadcast on ' + path ,
'events.listening' : 'data.events is a JSON object of event definitions by type ' + config . id + ' is listening for on ' + path
}
} ,
onStart : function ( socket ) {
this . trigger (
'start' ,
socket
) ;
} ,
start : function ( ) {
if ( ! this . path ) {
console . log ( 'Socket Server Path not specified, refusing to start' . warn ) ;
return ;
}
fs . unlink (
this . path ,
(
function ( server ) {
return function ( ) {
2014-02-26 12:15:43 +11:00
server . log ( 'starting server on ' . debug , server . path . variable , ( ( server . port ) ? ':' + server . port : '' ) . variable ) ;
2014-02-27 09:04:09 +11:00
if ( ! server . udp4 && ! server . udp6 ) {
server . server = net . createServer (
serverCreated
) ;
} else {
function UDPWrite ( message , socket ) {
var data = new Buffer ( message , server . config . encoding ) ;
server . server . send (
data ,
0 ,
data . length ,
socket . port ,
socket . address ,
function ( err , bytes ) {
if ( err ) {
2014-02-22 20:13:31 +11:00
server . trigger (
2014-02-27 09:04:09 +11:00
'error' ,
function ( err ) {
server . trigger ( 'error' , err ) ;
}
) ;
2014-02-22 20:13:31 +11:00
}
}
) ;
}
2014-02-27 09:04:09 +11:00
server . server = dgram . createSocket (
( ( server . udp4 ) ? 'udp4' : 'udp6' )
) ;
server . server . write = UDPWrite ;
server . server . on (
'listening' ,
function ( ) {
serverCreated ( server . server )
}
) ;
}
function serverCreated ( socket ) {
if ( socket . setEncoding )
socket . setEncoding ( server . config . encoding ) ;
server . log ( '## socket connection to server detected ##' . rainbow ) ;
socket . on (
'close' ,
function ( socket ) {
server . trigger (
'close' ,
socket
) ;
}
) ;
socket . on (
'error' ,
function ( err ) {
server . trigger ( 'error' , err ) ;
}
) ;
socket . on (
'data' ,
function ( data , UDPSocket ) {
2014-03-04 07:02:56 +11:00
if ( ! this . ipcBuffer )
this . ipcBuffer = '' ;
data = ( this . ipcBuffer += data ) ;
if ( data . slice ( - 1 ) != eventParser . delimiter ) {
server . log ( 'Socket buffer size exceeded, consider smaller messages or a larger buffer.' . warn , 'Implementing software buffer expansion for this message.' . notice ) ;
return ;
}
this . ipcBuffer = '' ;
2014-02-27 09:04:09 +11:00
data = eventParser . parse ( data ) ;
var sock = ( ( server . udp4 || server . udp6 ) ? UDPSocket : socket ) ;
while ( data . length > 0 ) {
var e = JSON . parse ( data . shift ( ) ) ;
server . log ( 'received event of : ' . debug , e . type . data , e . data ) ;
server . sockets . push ( sock ) ;
server . trigger (
e . type ,
e . data ,
sock
) ;
}
}
) ;
socket . on (
'message' ,
function ( msg , rinfo ) {
2014-02-28 07:31:40 +11:00
if ( ! rinfo )
return ;
2014-02-27 09:04:09 +11:00
server . log ( 'Received UDP message from ' . debug , rinfo . address . variable , rinfo . port ) ;
socket . emit ( 'data' , msg . toString ( ) , rinfo ) ;
}
) ;
server . trigger (
'connect' ,
socket
) ;
server . trigger (
'get.events.broadcasting' ,
socket
) ;
server . trigger (
'get.events.listening' ,
socket
) ;
}
2014-02-22 20:13:31 +11:00
2014-02-26 12:15:43 +11:00
function started ( socket ) {
server . onStart ( socket )
}
if ( ! port ) {
2014-02-27 09:04:09 +11:00
server . log ( 'starting server as' . debug , 'Unix Socket' . variable ) ;
2014-02-26 12:15:43 +11:00
server . server . listen (
server . path ,
started
) ;
server . server . maxConnections = server . maxConnections ;
return ;
}
2014-02-22 20:13:31 +11:00
2014-02-27 09:04:09 +11:00
if ( ! server . udp4 && ! server . udp4 ) {
server . log ( 'starting server as' . debug , 'TCP' . variable ) ;
server . server . listen (
server . port ,
server . path ,
started
) ;
return ;
}
server . log ( 'starting server as' . debug , ( ( server . udp4 ) ? 'udp4' : 'udp6' ) . variable ) ;
server . server . bind (
server . port ,
server . path
) ;
started (
{
address : server . path ,
port : server . port
}
2014-02-22 20:13:31 +11:00
) ;
}
}
) ( this )
) ;
}
} ;
new pubsub ( server ) ;
server . on (
'get.events.broadcasting' ,
function ( socket ) {
server . emit (
socket ,
'events.broadcasting' ,
{
id : server . config . id ,
events : server . define . broadcast
}
) ;
}
) ;
server . on (
'get.events.listening' ,
function ( socket ) {
server . emit (
socket ,
'events.listening' ,
{
id : server . config . id ,
events : server . define . listen ,
}
) ;
}
)
server . on (
'close' ,
function ( ) {
for ( var i = 0 , count = server . sockets . length ; i < count ; i ++ ) {
var socket = server . sockets [ i ] ;
2014-03-01 21:30:26 +11:00
if ( socket ) {
if ( socket . readable )
continue ;
}
2014-02-22 20:13:31 +11:00
server . log ( 'Socket disconnected' . notice ) ;
2014-03-01 21:30:26 +11:00
if ( socket )
socket . destroy ( ) ;
server . sockets . splice ( i , 1 ) ;
2014-02-22 20:13:31 +11:00
server . trigger (
'socket.disconnected'
) ;
return ;
}
}
) ;
return server ;
}
2014-02-28 07:31:40 +11:00
module . exports = init ;