2016-01-10 04:57:08 -08:00
'use strict' ;
2016-01-10 04:18:14 -08:00
const net = require ( 'net' ) ,
tls = require ( 'tls' ) ,
fs = require ( 'fs' ) ,
dgram = require ( 'dgram' ) ,
2014-02-22 01:13:31 -08:00
eventParser = require ( '../lib/eventParser.js' ) ,
2016-01-10 04:18:14 -08:00
Pubsub = require ( 'event-pubsub' ) ,
2015-09-27 03:32:14 -07:00
Message = require ( 'js-message' ) ;
2014-02-22 01:13:31 -08:00
function emit ( socket , type , data ) {
this . log ( 'dispatching event to socket' . debug , ' : ' , type . data , data ) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:18:14 -08:00
let message = new Message ;
2015-09-27 03:32:14 -07:00
message . type = type ;
message . data = data ;
2015-08-22 22:46:55 -07:00
if ( this . config . rawBuffer ) {
2015-09-27 03:32:14 -07:00
message = new Buffer ( type , this . encoding ) ;
2015-08-22 22:46:55 -07:00
} else {
2015-09-27 03:32:14 -07:00
message = eventParser . format ( message ) ;
2015-08-22 22:46:55 -07:00
}
2014-02-26 14:04:09 -08:00
if ( this . udp4 || this . udp6 ) {
2015-09-27 03:32:14 -07:00
2014-02-26 14:04:09 -08:00
if ( ! socket . address || ! socket . port ) {
this . log ( 'Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets' ) ;
this . broadcast ( type , data ) ;
return ;
}
2015-09-27 03:32:14 -07:00
2014-02-26 14:04:09 -08:00
this . server . write (
2015-09-27 03:32:14 -07:00
message ,
2014-02-26 14:04:09 -08:00
socket
2015-08-22 22:46:55 -07:00
) ;
2014-02-26 14:04:09 -08:00
return ;
2016-01-10 04:18:14 -08:00
}
2015-09-27 03:32:14 -07:00
socket . write ( message ) ;
2016-01-10 04:18:14 -08:00
}
2014-02-22 01:13:31 -08:00
function broadcast ( type , data ) {
2016-01-10 04:18:14 -08:00
this . log ( 'broadcasting event to all known sockets listening to ' . debug , this . path . variable , ' : ' , ( ( this . port ) ? this . port : '' ) , type , data ) ;
let message = new Message ;
2015-09-27 03:32:14 -07:00
message . type = type ;
message . data = data ;
2015-08-22 22:46:55 -07:00
if ( this . config . rawBuffer ) {
2015-09-27 03:32:14 -07:00
message = new Buffer ( type , this . encoding ) ;
2015-08-22 22:46:55 -07:00
} else {
2015-09-27 03:32:14 -07:00
message = eventParser . format ( message ) ;
2015-08-22 22:46:55 -07:00
}
2014-02-26 14:04:09 -08:00
if ( this . udp4 || this . udp6 ) {
2016-01-10 04:18:14 -08:00
for ( let i = 1 , count = this . sockets . length ; i < count ; i ++ ) {
2015-09-27 03:32:14 -07:00
this . server . write ( message , this . sockets [ i ] ) ;
2014-02-26 14:04:09 -08:00
}
} else {
2016-01-10 04:18:14 -08:00
for ( let i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
2015-09-27 03:32:14 -07:00
this . sockets [ i ] . write ( message ) ;
2014-02-26 14:04:09 -08:00
}
2014-02-22 01:13:31 -08:00
}
2016-01-10 04:18:14 -08:00
}
2014-02-22 01:13:31 -08:00
2014-02-25 17:15:43 -08:00
function init ( path , config , log , port ) {
2016-01-10 04:18:14 -08:00
let server = {
2014-02-22 01:13:31 -08:00
config : config ,
path : path ,
2014-02-25 17:15:43 -08:00
port : port ,
2014-02-26 14:04:09 -08:00
udp4 : false ,
udp6 : false ,
2014-02-22 01:13:31 -08:00
log : log ,
server : false ,
sockets : [ ] ,
emit : emit ,
broadcast : broadcast ,
2016-01-10 04:57:08 -08:00
onStart : function onStart ( socket ) {
2014-02-22 01:13:31 -08:00
this . trigger (
'start' ,
socket
) ;
} ,
2016-01-10 04:57:08 -08:00
stop : function stop ( ) {
2016-01-04 01:33:55 -08:00
server . server . close ( ) ;
} ,
2016-01-10 04:57:08 -08:00
start : function start ( ) {
2014-02-22 01:13:31 -08:00
if ( ! this . path ) {
2014-03-11 10:44:37 -07:00
server . log ( 'Socket Server Path not specified, refusing to start' . warn ) ;
2014-02-22 01:13:31 -08:00
return ;
}
2015-09-27 03:32:14 -07:00
2014-02-22 01:13:31 -08:00
fs . unlink (
2015-09-27 03:32:14 -07:00
this . path ,
2016-01-10 04:57:08 -08:00
function ( ) {
server . log ( 'starting server on ' . debug , server . path . variable , ( ( server . port ) ? ':' + server . port : '' ) . variable ) ;
if ( ! server . udp4 && ! server . udp6 ) {
if ( ! server . config . tls ) {
server . server = net . createServer (
serverCreated
) ;
} else {
server . log ( 'starting TLS server' . debug , server . config . tls ) ;
if ( server . config . tls . private ) {
server . config . tls . key = fs . readFileSync ( server . config . tls . private ) ;
} else {
server . config . tls . key = fs . readFileSync ( _ _dirname + '/../local-node-ipc-certs/private/server.key' ) ;
}
if ( server . config . tls . public ) {
server . config . tls . cert = fs . readFileSync ( server . config . tls . public ) ;
} else {
server . config . tls . cert = fs . readFileSync ( _ _dirname + '/../local-node-ipc-certs/server.pub' ) ;
}
if ( server . config . tls . dhparam ) {
server . config . tls . dhparam = fs . readFileSync ( server . config . tls . dhparam ) ;
}
if ( server . config . tls . trustedConnections ) {
if ( typeof server . config . tls . trustedConnections === 'string' ) {
server . config . tls . trustedConnections = [ server . config . tls . trustedConnections ] ;
}
server . config . tls . ca = [ ] ;
for ( let i = 0 ; i < server . config . tls . trustedConnections . length ; i ++ ) {
server . config . tls . ca . push (
fs . readFileSync ( server . config . tls . trustedConnections [ i ] )
2015-09-27 21:52:16 -07:00
) ;
}
2016-01-10 04:57:08 -08:00
}
server . server = tls . createServer (
server . config . tls ,
serverCreated
) ;
}
} else {
function UDPWrite ( message , socket ) {
let data = new Buffer ( message , server . config . encoding ) ;
server . server . send (
data ,
0 ,
data . length ,
socket . port ,
socket . address ,
function ( err , bytes ) {
if ( err ) {
server . log ( 'error writing data to socket' . warn , err ) ;
server . trigger (
'error' ,
function ( err ) {
server . trigger ( 'error' , err ) ;
2014-02-22 01:13:31 -08:00
}
2016-01-10 04:57:08 -08:00
) ;
}
2014-02-22 01:13:31 -08:00
}
2016-01-10 04:57:08 -08:00
) ;
}
server . server = dgram . createSocket (
( ( server . udp4 ) ? 'udp4' : 'udp6' )
) ;
server . server . write = UDPWrite ;
server . server . on (
'listening' ,
function ( ) {
serverCreated ( server . server ) ;
}
) ;
}
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
server . server . on (
'error' ,
function ( err ) {
server . log ( 'server error' . warn , err ) ;
server . trigger (
'error' ,
err
) ;
}
) ;
server . server . maxConnections = server . config . maxConnections ;
function serverCreated ( socket ) {
server . sockets . push ( socket ) ;
if ( socket . setEncoding ) {
socket . setEncoding ( server . config . encoding ) ;
}
server . log ( '## socket connection to server detected ##' . rainbow ) ;
socket . on (
'close' ,
function ( socket ) {
server . trigger (
'close' ,
socket
2014-02-26 14:04:09 -08:00
) ;
}
2016-01-10 04:57:08 -08:00
) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
socket . on (
'error' ,
function ( err ) {
server . log ( 'server socket error' . warn , err ) ;
2016-01-04 01:00:08 -08:00
2016-01-10 04:57:08 -08:00
server . trigger ( 'error' , err ) ;
}
) ;
socket . on (
'data' ,
function ( data , UDPSocket ) {
let sock = ( ( server . udp4 || server . udp6 ) ? UDPSocket : socket ) ;
if ( server . config . rawBuffer ) {
data = new Buffer ( data , this . encoding ) ;
2016-01-04 01:00:08 -08:00
server . trigger (
2016-01-10 04:57:08 -08:00
'data' ,
data ,
sock
2016-01-10 04:18:14 -08:00
) ;
2016-01-10 04:57:08 -08:00
return ;
2016-01-04 01:00:08 -08:00
}
2016-01-04 01:10:58 -08:00
2016-01-10 04:57:08 -08:00
if ( ! this . ipcBuffer ) {
this . ipcBuffer = '' ;
}
2016-01-04 01:00:08 -08:00
2016-01-10 04:57:08 -08:00
data = ( this . ipcBuffer += data ) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
if ( data . slice ( - 1 ) != eventParser . delimiter || data . indexOf ( eventParser . delimiter ) == - 1 ) {
server . log ( 'Implementing larger buffer for this socket message. You may want to consider smaller messages' . notice ) ;
return ;
2016-01-10 04:32:46 -08:00
}
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
this . ipcBuffer = '' ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
data = eventParser . parse ( data ) ;
2016-01-04 01:00:08 -08:00
2016-01-10 04:57:08 -08:00
while ( data . length > 0 ) {
let message = new Message ;
message . load ( data . shift ( ) ) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
server . log ( 'received event of : ' . debug , message . type . data , message . data ) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
if ( message . data . id )
sock . id = message . data . id ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
server . trigger (
message . type ,
message . data ,
sock
) ;
}
}
) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
socket . on (
'message' ,
function ( msg , rinfo ) {
if ( ! rinfo ) {
2015-08-22 22:46:55 -07:00
return ;
}
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
server . log ( 'Received UDP message from ' . debug , rinfo . address . variable , rinfo . port ) ;
let data ;
if ( server . config . rawSocket ) {
data = new Buffer ( msg , this . encoding ) ;
} else {
data = msg . toString ( ) ;
}
socket . emit ( 'data' , data , rinfo ) ;
2014-02-25 17:15:43 -08:00
}
2016-01-10 04:57:08 -08:00
) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
server . trigger (
'connect' ,
socket
) ;
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
if ( server . config . rawBuffer ) {
return ;
}
}
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
function started ( socket ) {
server . onStart ( socket ) ;
}
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
if ( ! port ) {
server . log ( 'starting server as' . debug , 'Unix || Windows Socket' . variable ) ;
if ( process . platform === 'win32' ) {
server . path = server . path . replace ( /^\// , '' ) ;
server . path = server . path . replace ( /\//g , '-' ) ;
server . path = '\\\\.\\pipe\\' + server . path ;
}
2015-09-27 03:32:14 -07:00
2016-01-10 04:57:08 -08:00
server . server . listen (
server . path ,
started
) ;
return ;
}
if ( ! server . udp4 && ! server . udp6 ) {
server . log ( 'starting server as' . debug , ( server . config . tls ? 'TLS' : 'TCP' ) . variable ) ;
server . server . listen (
server . port ,
server . path ,
started
) ;
return ;
2014-02-22 01:13:31 -08:00
}
2016-01-10 04:57:08 -08:00
server . log ( 'starting server as' . debug , ( ( server . udp4 ) ? 'udp4' : 'udp6' ) . variable ) ;
server . server . bind (
server . port ,
server . path
) ;
started (
{
address : server . path ,
port : server . port
}
) ;
}
2014-02-22 01:13:31 -08:00
) ;
}
} ;
2015-09-27 03:32:14 -07:00
2016-01-10 03:53:33 -08:00
new Pubsub ( server ) ;
2015-09-27 03:32:14 -07:00
2014-02-22 01:13:31 -08:00
server . on (
2015-09-27 03:32:14 -07:00
'close' ,
2014-02-22 01:13:31 -08:00
function ( ) {
2016-01-10 04:18:14 -08:00
for ( let i = 0 , count = server . sockets . length ; i < count ; i ++ ) {
let socket = server . sockets [ i ] ;
let destroyedSocketId = false ;
2015-09-27 03:32:14 -07:00
2014-03-01 02:30:26 -08:00
if ( socket ) {
2016-01-10 02:39:14 -08:00
if ( socket . readable ) {
2015-09-27 03:32:14 -07:00
continue ;
2016-01-10 02:39:14 -08:00
}
2014-03-01 02:30:26 -08:00
}
2015-09-27 03:32:14 -07:00
2016-01-10 02:39:14 -08:00
if ( socket . id ) {
2016-01-10 04:18:14 -08:00
destroyedSocketId = socket . id ;
2016-01-10 02:39:14 -08:00
}
2015-09-27 03:32:14 -07:00
2014-08-27 20:57:37 -07:00
server . log ( 'socket disconnected' . notice , ' ' + destroyedSocketId . variable ) ;
2014-08-26 20:15:53 -07:00
2016-01-10 02:39:14 -08:00
if ( socket && socket . destroy ) {
2014-03-01 02:30:26 -08:00
socket . destroy ( ) ;
2016-01-10 02:39:14 -08:00
}
2015-09-27 03:32:14 -07:00
2014-03-01 02:30:26 -08:00
server . sockets . splice ( i , 1 ) ;
2014-08-26 20:15:53 -07:00
2014-08-27 20:57:37 -07:00
server . trigger ( 'socket.disconnected' , socket , destroyedSocketId ) ;
2014-08-26 20:15:53 -07:00
2014-02-22 01:13:31 -08:00
return ;
}
2015-09-27 03:32:14 -07:00
}
2014-08-27 20:41:12 -07:00
) ;
2014-02-22 01:13:31 -08:00
return server ;
}
2014-02-27 12:31:40 -08:00
module . exports = init ;