2016-01-10 23:57:08 +11:00
'use strict' ;
2016-01-10 23:18:14 +11:00
const net = require ( 'net' ) ,
tls = require ( 'tls' ) ,
fs = require ( 'fs' ) ,
dgram = require ( 'dgram' ) ,
2017-04-16 14:13:15 +10:00
EventParser = require ( '../entities/EventParser.js' ) ,
2015-09-27 20:32:14 +10:00
Message = require ( 'js-message' ) ;
2014-02-22 20:13:31 +11:00
2016-10-04 07:34:27 +11:00
let Events = require ( 'event-pubsub/es5' ) ;
if ( process . version [ 1 ] > 4 ) {
Events = require ( 'event-pubsub' ) ;
}
2017-04-16 15:17:25 +10:00
let eventParser = new EventParser ( ) ;
2016-09-30 23:00:28 +10:00
class Server extends Events {
constructor ( path , config , log , port ) {
2017-04-16 15:17:25 +10:00
super ( ) ;
2016-09-30 23:00:28 +10:00
Object . assign (
this ,
{
config : config ,
path : path ,
port : port ,
udp4 : false ,
udp6 : false ,
log : log ,
server : false ,
sockets : [ ] ,
emit : emit ,
broadcast : broadcast
}
) ;
2017-04-16 15:17:25 +10:00
eventParser = new EventParser ( this . config ) ;
2016-09-30 23:00:28 +10:00
this . on (
'close' ,
serverClosed . bind ( this )
) ;
}
onStart ( socket ) {
this . trigger (
'start' ,
socket
) ;
}
stop ( ) {
this . server . close ( ) ;
}
start ( ) {
if ( ! this . path ) {
this . log ( 'Socket Server Path not specified, refusing to start' ) ;
return ;
}
2017-05-25 03:38:42 +10:00
if ( this . config . unlink ) {
2017-05-25 03:36:40 +10:00
fs . unlink (
this . path ,
startServer . bind ( this )
) ;
} else {
startServer . bind ( this ) ( ) ;
}
2016-09-30 23:00:28 +10:00
}
}
2014-02-22 20:13:31 +11:00
function emit ( socket , type , data ) {
2016-03-23 18:07:37 +11:00
this . log ( 'dispatching event to socket' , ' : ' , type , data ) ;
2015-09-27 20:32:14 +10:00
2016-01-10 23:18:14 +11:00
let message = new Message ;
2015-09-27 20:32:14 +10:00
message . type = type ;
message . data = data ;
2015-08-23 15:46:55 +10:00
if ( this . config . rawBuffer ) {
2016-12-22 12:03:12 +11:00
this . log ( this . config . encoding )
2016-03-23 18:07:37 +11:00
message = new Buffer ( type , this . config . encoding ) ;
2015-08-23 15:46:55 +10:00
} else {
2015-09-27 20:32:14 +10:00
message = eventParser . format ( message ) ;
2015-08-23 15:46:55 +10:00
}
2014-02-27 09:04:09 +11:00
if ( this . udp4 || this . udp6 ) {
2015-09-27 20:32:14 +10:00
2014-02-27 09:04:09 +11:00
if ( ! socket . address || ! socket . port ) {
this . log ( 'Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets' ) ;
this . broadcast ( type , data ) ;
return ;
}
2015-09-27 20:32:14 +10:00
2014-02-27 09:04:09 +11:00
this . server . write (
2015-09-27 20:32:14 +10:00
message ,
2014-02-27 09:04:09 +11:00
socket
2015-08-23 15:46:55 +10:00
) ;
2014-02-27 09:04:09 +11:00
return ;
2016-01-10 23:18:14 +11:00
}
2015-09-27 20:32:14 +10:00
socket . write ( message ) ;
2016-01-10 23:18:14 +11:00
}
2014-02-22 20:13:31 +11:00
function broadcast ( type , data ) {
2016-03-23 18:07:37 +11:00
this . log ( 'broadcasting event to all known sockets listening to ' , this . path , ' : ' , ( ( this . port ) ? this . port : '' ) , type , data ) ;
2016-01-10 23:18:14 +11:00
let message = new Message ;
2015-09-27 20:32:14 +10:00
message . type = type ;
message . data = data ;
2015-08-23 15:46:55 +10:00
if ( this . config . rawBuffer ) {
2016-03-23 18:07:37 +11:00
message = new Buffer ( type , this . config . encoding ) ;
2015-08-23 15:46:55 +10:00
} else {
2015-09-27 20:32:14 +10:00
message = eventParser . format ( message ) ;
2015-08-23 15:46:55 +10:00
}
2014-02-27 09:04:09 +11:00
if ( this . udp4 || this . udp6 ) {
2016-01-10 23:18:14 +11:00
for ( let i = 1 , count = this . sockets . length ; i < count ; i ++ ) {
2015-09-27 20:32:14 +10:00
this . server . write ( message , this . sockets [ i ] ) ;
2014-02-27 09:04:09 +11:00
}
} else {
2016-01-10 23:18:14 +11:00
for ( let i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
2015-09-27 20:32:14 +10:00
this . sockets [ i ] . write ( message ) ;
2014-02-27 09:04:09 +11:00
}
2014-02-22 20:13:31 +11:00
}
2016-01-10 23:18:14 +11:00
}
2014-02-22 20:13:31 +11:00
2016-09-30 23:00:28 +10:00
function serverClosed ( ) {
for ( let i = 0 , count = this . sockets . length ; i < count ; i ++ ) {
let socket = this . sockets [ i ] ;
let destroyedSocketId = false ;
if ( socket ) {
if ( socket . readable ) {
continue ;
2014-02-22 20:13:31 +11:00
}
2016-09-30 23:00:28 +10:00
}
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
if ( socket . id ) {
destroyedSocketId = socket . id ;
}
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
this . log ( 'socket disconnected' , destroyedSocketId . toString ( ) ) ;
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
if ( socket && socket . destroy ) {
socket . destroy ( ) ;
}
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
this . sockets . splice ( i , 1 ) ;
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
this . publish ( 'socket.disconnected' , socket , destroyedSocketId ) ;
2016-01-10 23:57:08 +11:00
2016-09-30 23:00:28 +10:00
return ;
}
}
2016-01-10 23:57:08 +11:00
2016-09-30 23:00:28 +10:00
function gotData ( socket , data , UDPSocket ) {
let sock = ( ( this . udp4 || this . udp6 ) ? UDPSocket : socket ) ;
if ( this . config . rawBuffer ) {
data = new Buffer ( data , this . config . encoding ) ;
this . publish (
'data' ,
data ,
sock
) ;
return ;
}
2016-01-10 23:57:08 +11:00
2016-09-30 23:00:28 +10:00
if ( ! this . ipcBuffer ) {
this . ipcBuffer = '' ;
}
data = ( this . ipcBuffer += data ) ;
if ( data . slice ( - 1 ) != eventParser . delimiter || data . indexOf ( eventParser . delimiter ) == - 1 ) {
this . log ( 'Messages are large, You may want to consider smaller messages.' ) ;
return ;
}
this . ipcBuffer = '' ;
data = eventParser . parse ( data ) ;
while ( data . length > 0 ) {
let message = new Message ;
message . load ( data . shift ( ) ) ;
2017-01-24 21:22:53 +11:00
2017-02-14 17:37:24 +11:00
// Only set the sock id if it is specified.
if ( message . data && message . data . id ) {
sock . id = message . data . id ;
2016-11-16 01:17:54 +11:00
}
2017-01-24 21:22:53 +11:00
2016-09-30 23:00:28 +10:00
this . log ( 'received event of : ' , message . type , message . data ) ;
this . publish (
message . type ,
message . data ,
sock
) ;
}
}
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
function socketClosed ( socket ) {
this . publish (
2015-09-27 20:32:14 +10:00
'close' ,
2016-09-30 23:00:28 +10:00
socket
) ;
}
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
function serverCreated ( socket ) {
this . sockets . push ( socket ) ;
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
if ( socket . setEncoding ) {
socket . setEncoding ( this . config . encoding ) ;
}
2014-08-27 13:15:53 +10:00
2016-09-30 23:00:28 +10:00
this . log ( '## socket connection to server detected ##' ) ;
socket . on (
'close' ,
socketClosed . bind ( this )
) ;
socket . on (
'error' ,
function ( err ) {
this . log ( 'server socket error' , err ) ;
2015-09-27 20:32:14 +10:00
2016-09-30 23:00:28 +10:00
this . publish ( 'error' , err ) ;
} . bind ( this )
) ;
2014-08-27 13:15:53 +10:00
2016-09-30 23:00:28 +10:00
socket . on (
'data' ,
gotData . bind ( this , socket )
) ;
2014-08-27 13:15:53 +10:00
2016-09-30 23:00:28 +10:00
socket . on (
'message' ,
function ( msg , rinfo ) {
if ( ! rinfo ) {
2014-02-22 20:13:31 +11:00
return ;
}
2016-09-30 23:00:28 +10:00
this . log ( 'Received UDP message from ' , rinfo . address , rinfo . port ) ;
let data ;
if ( this . config . rawSocket ) {
data = new Buffer ( msg , this . config . encoding ) ;
} else {
data = msg . toString ( ) ;
}
socket . emit ( 'data' , data , rinfo ) ;
} . bind ( this )
) ;
this . publish (
'connect' ,
socket
) ;
if ( this . config . rawBuffer ) {
return ;
}
}
function startServer ( ) {
this . log (
'starting server on ' , this . path ,
( ( this . port ) ? ` : ${ this . port } ` : '' )
) ;
if ( ! this . udp4 && ! this . udp6 ) {
2016-12-22 12:03:12 +11:00
this . log ( 'starting TLS server' , this . config . tls ) ;
2016-09-30 23:00:28 +10:00
if ( ! this . config . tls ) {
this . server = net . createServer (
serverCreated . bind ( this )
) ;
} else {
2016-12-03 20:31:03 +11:00
startTLSServer . bind ( this ) ( ) ;
2016-09-30 23:00:28 +10:00
}
} else {
this . server = dgram . createSocket (
( ( this . udp4 ) ? 'udp4' : 'udp6' )
) ;
this . server . write = UDPWrite . bind ( this ) ;
this . server . on (
'listening' ,
function UDPServerStarted ( ) {
serverCreated . bind ( this ) ( this . server ) ;
} . bind ( this )
) ;
}
this . server . on (
'error' ,
function ( err ) {
2017-05-19 07:22:56 +10:00
this . log ( 'server error' , err ) ;
2016-09-30 23:00:28 +10:00
2017-05-19 07:22:56 +10:00
this . publish (
2016-09-30 23:00:28 +10:00
'error' ,
err
) ;
2017-05-19 07:22:56 +10:00
} . bind ( this )
2014-08-28 13:41:12 +10:00
) ;
2014-02-22 20:13:31 +11:00
2016-09-30 23:00:28 +10:00
this . server . maxConnections = this . config . maxConnections ;
if ( ! this . port ) {
this . log ( 'starting server as' , 'Unix || Windows Socket' ) ;
if ( process . platform === 'win32' ) {
this . path = this . path . replace ( /^\// , '' ) ;
this . path = this . path . replace ( /\//g , '-' ) ;
this . path = ` \\ \\ . \\ pipe \\ ${ this . path } ` ;
}
this . server . listen (
this . path ,
this . onStart . bind ( this )
) ;
return ;
}
if ( ! this . udp4 && ! this . udp6 ) {
this . log ( 'starting server as' , ( this . config . tls ? 'TLS' : 'TCP' ) ) ;
this . server . listen (
this . port ,
this . path ,
this . onStart . bind ( this )
) ;
return ;
}
this . log ( 'starting server as' , ( ( this . udp4 ) ? 'udp4' : 'udp6' ) ) ;
this . server . bind (
this . port ,
this . path
) ;
this . onStart (
{
address : this . path ,
port : this . port
}
) ;
}
function startTLSServer ( ) {
this . log ( 'starting TLS server' , this . config . tls ) ;
if ( this . config . tls . private ) {
this . config . tls . key = fs . readFileSync ( this . config . tls . private ) ;
} else {
this . config . tls . key = fs . readFileSync ( ` ${ _ _dirname } /../local-node-ipc-certs/private/server.key ` ) ;
}
if ( this . config . tls . public ) {
this . config . tls . cert = fs . readFileSync ( this . config . tls . public ) ;
} else {
this . config . tls . cert = fs . readFileSync ( ` ${ _ _dirname } /../local-node-ipc-certs/server.pub ` ) ;
}
if ( this . config . tls . dhparam ) {
this . config . tls . dhparam = fs . readFileSync ( this . config . tls . dhparam ) ;
}
if ( this . config . tls . trustedConnections ) {
if ( typeof this . config . tls . trustedConnections === 'string' ) {
this . config . tls . trustedConnections = [ this . config . tls . trustedConnections ] ;
}
this . config . tls . ca = [ ] ;
for ( let i = 0 ; i < this . config . tls . trustedConnections . length ; i ++ ) {
this . config . tls . ca . push (
fs . readFileSync ( this . config . tls . trustedConnections [ i ] )
) ;
}
}
this . server = tls . createServer (
this . config . tls ,
serverCreated . bind ( this )
) ;
}
function UDPWrite ( message , socket ) {
let data = new Buffer ( message , this . config . encoding ) ;
this . server . send (
data ,
0 ,
data . length ,
socket . port ,
socket . address ,
function ( err , bytes ) {
if ( err ) {
this . log ( 'error writing data to socket' , err ) ;
this . publish (
'error' ,
function ( err ) {
this . publish ( 'error' , err ) ;
}
) ;
}
}
) ;
2014-02-22 20:13:31 +11:00
}
2016-09-30 23:00:28 +10:00
module . exports = Server ;