node-ipc/dao/socketServer.js
Kewin Brandsma c48c203f43 wip
2016-10-04 21:43:33 +02:00

428 lines
10 KiB
JavaScript

'use strict';
const net = require('net'),
tls = require('tls'),
fs = require('fs'),
dgram = require('dgram'),
eventParser = require('./eventParser.js'),
Events = require('event-pubsub'),
Message = require('js-message'),
Client = require('../dao/client.js');
class Server extends Events{
constructor(path,config,log,port){
super();
Object.assign(
this,
{
config : config,
path : path,
port : port,
udp4 : false,
udp6 : false,
log : log,
server : false,
sockets : [],
emit : emit,
broadcast : broadcast,
of : {}
}
);
this.on(
'close',
serverClosed.bind(this)
);
}
onStart(socket){
this.trigger(
'start',
socket
);
}
stop(){
this.server.close();
}
start(){
if(!this.path){
this.log('Socket Server Path not specified, refusing to start');
return;
}
fs.unlink(
this.path,
startServer.bind(this)
);
}
}
function emit(socket, type, data){
this.log('dispatching event to socket', ' : ', type, data);
let message=new Message;
message.type=type;
message.data=data;
if(this.config.rawBuffer){
console.log(this.config.encoding)
message=new Buffer(type,this.config.encoding);
}else{
message=eventParser.format(message);
}
if(this.udp4 || this.udp6){
if(!socket.address || !socket.port){
this.log('Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets');
this.broadcast(type,data);
return;
}
this.server.write(
message,
socket
);
return;
}
socket.write(message);
}
function broadcast(type,data){
this.log('broadcasting event to all known sockets listening to ', this.path,' : ', ((this.port)?this.port:''), type, data);
let message=new Message;
message.type=type;
message.data=data;
if(this.config.rawBuffer){
message=new Buffer(type,this.config.encoding);
}else{
message=eventParser.format(message);
}
if(this.udp4 || this.udp6){
for(let i=1, count=this.sockets.length; i<count; i++){
this.server.write(message,this.sockets[i]);
}
}else{
for(let i=0, count=this.sockets.length; i<count; i++){
this.sockets[i].write(message);
}
}
}
function serverClosed(){
for(let i=0, count=this.sockets.length; i<count; i++){
let socket=this.sockets[i];
let destroyedSocketId=false;
if(socket){
if(socket.readable){
continue;
}
}
if(socket.id){
destroyedSocketId=socket.id;
}
this.log('socket disconnected',destroyedSocketId.toString());
if(socket && socket.destroy){
socket.destroy();
}
this.sockets.splice(i,1);
this.publish('socket.disconnected', socket, destroyedSocketId);
return;
}
}
function gotData(socket,data,UDPSocket){
let sock=((this.udp4 || this.udp6)? UDPSocket : socket);
if(this.config.rawBuffer){
data=new Buffer(data,this.config.encoding);
this.publish(
'data',
data,
sock
);
return;
}
if(!this.ipcBuffer){
this.ipcBuffer='';
}
data=(this.ipcBuffer+=data);
if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){
this.log('Messages are large, You may want to consider smaller messages.');
return;
}
this.ipcBuffer='';
data=eventParser.parse(data);
while(data.length>0){
let message=new Message;
message.load(data.shift());
this.log('received event of : ',message.type,message.data);
if(message.data.id){
sock.id=message.data.id;
}
this.publish(
message.type,
message.data,
sock
);
}
}
function socketClosed(socket){
this.publish(
'close',
socket
);
}
function serverCreated(socket) {
this.sockets.push(socket);
if(socket.setEncoding){
socket.setEncoding(this.config.encoding);
}
this.log('## socket connection to server detected ##');
socket.on(
'close',
socketClosed.bind(this)
);
socket.on(
'error',
function(err){
this.log('server socket error',err);
this.publish('error',err);
}.bind(this)
);
socket.on(
'data',
gotData.bind(this,socket)
);
socket.on(
'message',
function(msg,rinfo) {
if (!rinfo){
return;
}
this.log('Received UDP message from ', rinfo.address, rinfo.port);
let data;
if(this.config.rawSocket){
data=new Buffer(msg,this.config.encoding);
}else{
data=msg.toString();
}
socket.emit('data',data,rinfo);
}.bind(this)
);
if (this.config.noHandshake) {
this.publish(
'connect',
socket
);
} else {
// Wait for handshake
var t = setTimeout(() => this.publish('error', 'Child connection did not finish handshake in time'), 2000);
// Handhake callback function
// Checks if its the same socket instance
var __identifyCb = (clientDetails, _socket) => {
if (_socket !== socket)
return;
// Clear handhake timeout
clearTimeout(t);
// Make sure event is removed
this.off('__identify', __identifyCb);
let id = clientDetails.id,
path = clientDetails.path,
clientConfig = Object.assign(this.config, {id: id, path: path});
this.of[id] = new Client(clientConfig, this.log, socket);
this.of[id].id = id;
this.of[id].path = path;
this.of[id].on('disconnect', function() {
delete this.of[id];
});
this.publish('connect', socket, this.of[id]);
};
this.on('__identify', __identifyCb);
this.emit(socket, '__identify');
}
if(this.config.rawBuffer){
return;
}
}
function startServer() {
this.log(
'starting server on ',this.path,
((this.port)?`:${this.port}`:'')
);
if(!this.udp4 && !this.udp6){
if(!this.config.tls){
this.log('starting TCP server',this.config.tls);
this.server=net.createServer(
serverCreated.bind(this)
);
}else{
startTLSServer.bind(this);
}
}else{
this.server=dgram.createSocket(
((this.udp4)? 'udp4':'udp6')
);
this.server.write=UDPWrite.bind(this);
this.server.on(
'listening',
function UDPServerStarted() {
serverCreated.bind(this)(this.server);
}.bind(this)
);
}
this.server.on(
'error',
function(err){
this.log('server error',err);
this.publish(
'error',
err
);
}
);
this.server.maxConnections=this.config.maxConnections;
if(!this.port){
this.log('starting server as', 'Unix || Windows Socket');
if (process.platform ==='win32'){
this.path = this.path.replace(/^\//, '');
this.path = this.path.replace(/\//g, '-');
this.path= `\\\\.\\pipe\\${this.path}`;
}
this.server.listen(
this.path,
this.onStart.bind(this)
);
return;
}
if(!this.udp4 && !this.udp6){
this.log('starting server as', (this.config.tls?'TLS':'TCP'));
this.server.listen(
this.port,
this.path,
this.onStart.bind(this)
);
return;
}
this.log('starting server as',((this.udp4)? 'udp4':'udp6'));
this.server.bind(
this.port,
this.path
);
this.onStart(
{
address : this.path,
port : this.port
}
);
}
function startTLSServer(){
this.log('starting TLS server',this.config.tls);
if(this.config.tls.private){
this.config.tls.key=fs.readFileSync(this.config.tls.private);
}else{
this.config.tls.key=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/private/server.key`);
}
if(this.config.tls.public){
this.config.tls.cert=fs.readFileSync(this.config.tls.public);
}else{
this.config.tls.cert=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/server.pub`);
}
if(this.config.tls.dhparam){
this.config.tls.dhparam=fs.readFileSync(this.config.tls.dhparam);
}
if(this.config.tls.trustedConnections){
if(typeof this.config.tls.trustedConnections === 'string'){
this.config.tls.trustedConnections=[this.config.tls.trustedConnections];
}
this.config.tls.ca=[];
for(let i=0; i<this.config.tls.trustedConnections.length; i++){
this.config.tls.ca.push(
fs.readFileSync(this.config.tls.trustedConnections[i])
);
}
}
this.server=tls.createServer(
this.config.tls,
serverCreated.bind(this)
);
}
function UDPWrite(message,socket){
let data=new Buffer(message, this.config.encoding);
this.server.send(
data,
0,
data.length,
socket.port,
socket.address,
function(err, bytes) {
if(err){
this.log('error writing data to socket',err);
this.publish(
'error',
function(err){
this.publish('error',err);
}
);
}
}
);
}
module.exports=Server;