node-ipc/dao/socketServer.js

374 lines
14 KiB
JavaScript
Raw Normal View History

2016-01-10 23:57:08 +11:00
'use strict';
2016-01-10 23:18:14 +11:00
const net = require('net'),
tls = require('tls'),
fs = require('fs'),
dgram = require('dgram'),
2016-01-12 07:28:13 +11:00
eventParser = require('./eventParser.js'),
2016-01-10 23:18:14 +11:00
Pubsub = require('event-pubsub'),
Message = require('js-message');
2014-02-22 20:13:31 +11:00
function emit(socket, type, data){
this.log('dispatching event to socket'.debug, ' : ', type.data, data);
2016-01-10 23:18:14 +11:00
let message=new Message;
message.type=type;
message.data=data;
2015-08-23 15:46:55 +10:00
if(this.config.rawBuffer){
message=new Buffer(type,this.encoding);
2015-08-23 15:46:55 +10:00
}else{
message=eventParser.format(message);
2015-08-23 15:46:55 +10:00
}
2014-02-27 09:04:09 +11:00
if(this.udp4 || this.udp6){
2014-02-27 09:04:09 +11:00
if(!socket.address || !socket.port){
this.log('Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets');
this.broadcast(type,data);
return;
}
2014-02-27 09:04:09 +11:00
this.server.write(
message,
2014-02-27 09:04:09 +11:00
socket
2015-08-23 15:46:55 +10:00
);
2014-02-27 09:04:09 +11:00
return;
2016-01-10 23:18:14 +11:00
}
socket.write(message);
2016-01-10 23:18:14 +11:00
}
2014-02-22 20:13:31 +11:00
function broadcast(type,data){
2016-01-10 23:18:14 +11:00
this.log('broadcasting event to all known sockets listening to '.debug, this.path.variable,' : ', ((this.port)?this.port:''), type, data);
let message=new Message;
message.type=type;
message.data=data;
2015-08-23 15:46:55 +10:00
if(this.config.rawBuffer){
message=new Buffer(type,this.encoding);
2015-08-23 15:46:55 +10:00
}else{
message=eventParser.format(message);
2015-08-23 15:46:55 +10:00
}
2014-02-27 09:04:09 +11:00
if(this.udp4 || this.udp6){
2016-01-10 23:18:14 +11:00
for(let i=1, count=this.sockets.length; i<count; i++){
this.server.write(message,this.sockets[i]);
2014-02-27 09:04:09 +11:00
}
}else{
2016-01-10 23:18:14 +11:00
for(let i=0, count=this.sockets.length; i<count; i++){
this.sockets[i].write(message);
2014-02-27 09:04:09 +11:00
}
2014-02-22 20:13:31 +11:00
}
2016-01-10 23:18:14 +11:00
}
2014-02-22 20:13:31 +11:00
2014-02-26 12:15:43 +11:00
function init(path,config,log,port){
2016-01-10 23:18:14 +11:00
let server={
2014-02-22 20:13:31 +11:00
config : config,
path : path,
2014-02-26 12:15:43 +11:00
port : port,
2014-02-27 09:04:09 +11:00
udp4 : false,
udp6 : false,
2014-02-22 20:13:31 +11:00
log : log,
server : false,
sockets : [],
emit : emit,
broadcast : broadcast,
2016-01-10 23:57:08 +11:00
onStart : function onStart(socket){
2014-02-22 20:13:31 +11:00
this.trigger(
'start',
socket
);
},
2016-01-10 23:57:08 +11:00
stop:function stop(){
2016-01-04 20:33:55 +11:00
server.server.close();
},
2016-01-10 23:57:08 +11:00
start : function start(){
2014-02-22 20:13:31 +11:00
if(!this.path){
2014-03-12 04:44:37 +11:00
server.log('Socket Server Path not specified, refusing to start'.warn);
2014-02-22 20:13:31 +11:00
return;
}
2014-02-22 20:13:31 +11:00
fs.unlink(
this.path,
2016-01-10 23:57:08 +11:00
function () {
2016-01-11 16:10:58 +11:00
server.log(
'starting server on '.debug,server.path.variable,
((server.port)?`:${server.port}`:'').variable
);
2016-01-10 23:57:08 +11:00
if(!server.udp4 && !server.udp6){
if(!server.config.tls){
server.server=net.createServer(
serverCreated
);
}else{
server.log('starting TLS server'.debug,server.config.tls);
if(server.config.tls.private){
server.config.tls.key=fs.readFileSync(server.config.tls.private);
}else{
2016-01-11 16:10:58 +11:00
server.config.tls.key=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/private/server.key`);
2016-01-10 23:57:08 +11:00
}
if(server.config.tls.public){
server.config.tls.cert=fs.readFileSync(server.config.tls.public);
}else{
2016-01-11 16:10:58 +11:00
server.config.tls.cert=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/server.pub`);
2016-01-10 23:57:08 +11:00
}
if(server.config.tls.dhparam){
server.config.tls.dhparam=fs.readFileSync(server.config.tls.dhparam);
}
if(server.config.tls.trustedConnections){
if(typeof server.config.tls.trustedConnections === 'string'){
server.config.tls.trustedConnections=[server.config.tls.trustedConnections];
}
server.config.tls.ca=[];
for(let i=0; i<server.config.tls.trustedConnections.length; i++){
server.config.tls.ca.push(
fs.readFileSync(server.config.tls.trustedConnections[i])
);
}
2016-01-10 23:57:08 +11:00
}
server.server=tls.createServer(
server.config.tls,
serverCreated
);
}
}else{
function UDPWrite(message,socket){
let data=new Buffer(message, server.config.encoding);
server.server.send(
data,
0,
data.length,
socket.port,
socket.address,
function(err, bytes) {
if(err){
server.log('error writing data to socket'.warn,err);
server.trigger(
'error',
function(err){
server.trigger('error',err);
2014-02-22 20:13:31 +11:00
}
2016-01-10 23:57:08 +11:00
);
}
2014-02-22 20:13:31 +11:00
}
2016-01-10 23:57:08 +11:00
);
}
server.server=dgram.createSocket(
((server.udp4)? 'udp4':'udp6')
);
server.server.write=UDPWrite;
server.server.on(
'listening',
function () {
serverCreated(server.server);
}
);
}
2016-01-10 23:57:08 +11:00
server.server.on(
'error',
function(err){
server.log('server error'.warn,err);
server.trigger(
'error',
err
);
}
);
server.server.maxConnections=server.config.maxConnections;
function serverCreated(socket) {
server.sockets.push(socket);
if(socket.setEncoding){
socket.setEncoding(server.config.encoding);
}
server.log('## socket connection to server detected ##'.rainbow);
socket.on(
'close',
function(socket){
server.trigger(
'close',
socket
2014-02-27 09:04:09 +11:00
);
}
2016-01-10 23:57:08 +11:00
);
2016-01-10 23:57:08 +11:00
socket.on(
'error',
function(err){
server.log('server socket error'.warn,err);
2016-01-10 23:57:08 +11:00
server.trigger('error',err);
}
);
socket.on(
'data',
function(data,UDPSocket){
let sock=((server.udp4 || server.udp6)? UDPSocket : socket);
if(server.config.rawBuffer){
data=new Buffer(data,this.encoding);
server.trigger(
2016-01-10 23:57:08 +11:00
'data',
data,
sock
2016-01-10 23:18:14 +11:00
);
2016-01-10 23:57:08 +11:00
return;
}
2016-01-10 23:57:08 +11:00
if(!this.ipcBuffer){
this.ipcBuffer='';
}
2016-01-10 23:57:08 +11:00
data=(this.ipcBuffer+=data);
2016-01-10 23:57:08 +11:00
if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){
server.log('Implementing larger buffer for this socket message. You may want to consider smaller messages'.notice);
return;
2016-01-10 23:32:46 +11:00
}
2016-01-10 23:57:08 +11:00
this.ipcBuffer='';
2016-01-10 23:57:08 +11:00
data=eventParser.parse(data);
2016-01-10 23:57:08 +11:00
while(data.length>0){
let message=new Message;
message.load(data.shift());
2016-01-10 23:57:08 +11:00
server.log('received event of : '.debug,message.type.data,message.data);
2016-01-12 07:28:13 +11:00
if(message.data.id){
2016-01-10 23:57:08 +11:00
sock.id=message.data.id;
2016-01-12 07:28:13 +11:00
}
2016-01-10 23:57:08 +11:00
server.trigger(
message.type,
message.data,
sock
);
}
}
);
2016-01-10 23:57:08 +11:00
socket.on(
'message',
function(msg,rinfo) {
if (!rinfo){
2015-08-23 15:46:55 +10:00
return;
}
2016-01-10 23:57:08 +11:00
server.log('Received UDP message from '.debug, rinfo.address.variable, rinfo.port);
let data;
if(server.config.rawSocket){
data=new Buffer(msg,this.encoding);
}else{
data=msg.toString();
}
socket.emit('data',data,rinfo);
2014-02-26 12:15:43 +11:00
}
2016-01-10 23:57:08 +11:00
);
2016-01-10 23:57:08 +11:00
server.trigger(
'connect',
socket
);
2016-01-10 23:57:08 +11:00
if(server.config.rawBuffer){
return;
}
}
2016-01-10 23:57:08 +11:00
function started(socket){
server.onStart(socket);
}
2016-01-10 23:57:08 +11:00
if(!port){
server.log('starting server as'.debug, 'Unix || Windows Socket'.variable);
if (process.platform ==='win32'){
2016-01-11 16:10:58 +11:00
server.path = server.path.replace(/^\//, '');
server.path = server.path.replace(/\//g, '-');
server.path= `\\\\.\\pipe\\${server.path}`;
}
2016-01-10 23:57:08 +11:00
server.server.listen(
server.path,
started
);
return;
}
if(!server.udp4 && !server.udp6){
server.log('starting server as'.debug, (server.config.tls?'TLS':'TCP').variable);
server.server.listen(
server.port,
server.path,
started
);
return;
2014-02-22 20:13:31 +11:00
}
2016-01-10 23:57:08 +11:00
server.log('starting server as'.debug,((server.udp4)? 'udp4':'udp6').variable);
server.server.bind(
server.port,
server.path
);
started(
{
address : server.path,
port : server.port
}
);
}
2014-02-22 20:13:31 +11:00
);
}
};
2016-01-10 22:53:33 +11:00
new Pubsub(server);
2014-02-22 20:13:31 +11:00
server.on(
'close',
2014-02-22 20:13:31 +11:00
function(){
2016-01-10 23:18:14 +11:00
for(let i=0, count=server.sockets.length; i<count; i++){
let socket=server.sockets[i];
let destroyedSocketId=false;
2014-03-01 21:30:26 +11:00
if(socket){
2016-01-10 21:39:14 +11:00
if(socket.readable){
continue;
2016-01-10 21:39:14 +11:00
}
2014-03-01 21:30:26 +11:00
}
2016-01-10 21:39:14 +11:00
if(socket.id){
2016-01-10 23:18:14 +11:00
destroyedSocketId=socket.id;
2016-01-10 21:39:14 +11:00
}
2016-01-11 16:10:58 +11:00
server.log('socket disconnected'.notice,destroyedSocketId.toString().variable);
2016-01-10 21:39:14 +11:00
if(socket && socket.destroy){
2014-03-01 21:30:26 +11:00
socket.destroy();
2016-01-10 21:39:14 +11:00
}
2014-03-01 21:30:26 +11:00
server.sockets.splice(i,1);
server.trigger('socket.disconnected', socket, destroyedSocketId);
2014-02-22 20:13:31 +11:00
return;
}
}
);
2014-02-22 20:13:31 +11:00
return server;
}
module.exports=init;