'use strict'; const net = require('net'), tls = require('tls'), fs = require('fs'), dgram = require('dgram'), eventParser = require('./eventParser.js'), Message = require('js-message'); let Events = require('event-pubsub/es5'); if(process.version[1]>4){ Events = require('event-pubsub'); } class Server extends Events{ constructor(path,config,log,port){ super(); Object.assign( this, { config : config, path : path, port : port, udp4 : false, udp6 : false, log : log, server : false, sockets : [], emit : emit, broadcast : broadcast, of : {} } ); this.on( 'close', serverClosed.bind(this) ); } onStart(socket){ this.trigger( 'start', socket ); } stop(){ this.server.close(); } start(){ if(!this.path){ this.log('Socket Server Path not specified, refusing to start'); return; } fs.unlink( this.path, startServer.bind(this) ); } } function emit(sockets, type, data){ if(! (sockets instanceof Array)){ sockets=[sockets]; } for(const socket of sockets){ this.log('dispatching event to socket', ' : ', type, data); let message=new Message; message.type=type; message.data=data; if(this.config.rawBuffer){ this.log(this.config.encoding) message=new Buffer(type,this.config.encoding); }else{ message=eventParser.format(message); } if(this.udp4 || this.udp6){ if(!socket.address || !socket.port){ this.log('Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets'); this.broadcast(type,data); return; } this.server.write( message, socket ); return; } socket.write(message); } } function broadcast(type,data){ this.log('broadcasting event to all known sockets listening to ', this.path,' : ', ((this.port)?this.port:''), type, data); let message=new Message; message.type=type; message.data=data; if(this.config.rawBuffer){ message=new Buffer(type,this.config.encoding); }else{ message=eventParser.format(message); } if(this.udp4 || this.udp6){ for(let i=1, count=this.sockets.length; i-1){ group.splice(index,1); if(group.length<1){ delete this.of[socket.id] } } } } this.log('socket disconnected',destroyedSocketId.toString()); if(socket && socket.destroy){ socket.destroy(); } this.sockets.splice(i,1); this.publish('socket.disconnected', socket, destroyedSocketId); return; } } function gotData(socket,data,UDPSocket){ let sock=((this.udp4 || this.udp6)? UDPSocket : socket); if(this.config.rawBuffer){ data=new Buffer(data,this.config.encoding); this.publish( 'data', data, sock ); return; } if(!this.ipcBuffer){ this.ipcBuffer=''; } data=(this.ipcBuffer+=data); if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){ this.log('Messages are large, You may want to consider smaller messages.'); return; } this.ipcBuffer=''; data=eventParser.parse(data); while(data.length>0){ let message=new Message; message.load(data.shift()); if (!sock.id && message.data && message.data.id){ sock.id=message.data.id; if(!this.of[sock.id]){ this.of[sock.id]=[]; } this.of[sock.id].push(sock); } this.log('received event of : ',message.type,message.data); this.publish( message.type, message.data, sock ); } } function socketClosed(socket){ this.publish( 'close', socket ); } function serverCreated(socket) { this.sockets.push(socket); if(socket.setEncoding){ socket.setEncoding(this.config.encoding); } this.log('## socket connection to server detected ##'); socket.on( 'close', socketClosed.bind(this) ); socket.on( 'error', function(err){ this.log('server socket error',err); this.publish('error',err); }.bind(this) ); socket.on( 'data', gotData.bind(this,socket) ); socket.on( 'message', function(msg,rinfo) { if (!rinfo){ return; } this.log('Received UDP message from ', rinfo.address, rinfo.port); let data; if(this.config.rawSocket){ data=new Buffer(msg,this.config.encoding); }else{ data=msg.toString(); } socket.emit('data',data,rinfo); }.bind(this) ); this.publish( 'connect', socket ); if(this.config.rawBuffer){ return; } } function startServer() { //persist scope through event bindings const server=this; this.log( 'starting server on ',this.path, ((this.port)?`:${this.port}`:'') ); if(!this.udp4 && !this.udp6){ this.log('starting TLS server',this.config.tls); if(!this.config.tls){ this.server=net.createServer( serverCreated.bind(this) ); }else{ startTLSServer.bind(this)(); } }else{ this.server=dgram.createSocket( ((this.udp4)? 'udp4':'udp6') ); this.server.write=UDPWrite.bind(this); this.server.on( 'listening', function UDPServerStarted() { serverCreated.bind(this)(this.server); }.bind(this) ); } this.server.on( 'error', function(err){ server.log('server error',err); console.log(server) server.publish( 'error', err ); } ); this.server.maxConnections=this.config.maxConnections; if(!this.port){ this.log('starting server as', 'Unix || Windows Socket'); if (process.platform ==='win32'){ this.path = this.path.replace(/^\//, ''); this.path = this.path.replace(/\//g, '-'); this.path= `\\\\.\\pipe\\${this.path}`; } this.server.listen( this.path, this.onStart.bind(this) ); return; } if(!this.udp4 && !this.udp6){ this.log('starting server as', (this.config.tls?'TLS':'TCP')); this.server.listen( this.port, this.path, this.onStart.bind(this) ); return; } this.log('starting server as',((this.udp4)? 'udp4':'udp6')); this.server.bind( this.port, this.path ); this.onStart( { address : this.path, port : this.port } ); } function startTLSServer(){ this.log('starting TLS server',this.config.tls); if(this.config.tls.private){ this.config.tls.key=fs.readFileSync(this.config.tls.private); }else{ this.config.tls.key=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/private/server.key`); } if(this.config.tls.public){ this.config.tls.cert=fs.readFileSync(this.config.tls.public); }else{ this.config.tls.cert=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/server.pub`); } if(this.config.tls.dhparam){ this.config.tls.dhparam=fs.readFileSync(this.config.tls.dhparam); } if(this.config.tls.trustedConnections){ if(typeof this.config.tls.trustedConnections === 'string'){ this.config.tls.trustedConnections=[this.config.tls.trustedConnections]; } this.config.tls.ca=[]; for(let i=0; i