node-ipc/dao/client.js

259 lines
7.1 KiB
JavaScript

import net from 'net';
import tls from 'tls';
import EventParser from '../entities/EventParser.js';
import Message from 'js-message';
import fs from 'fs';
import Queue from 'js-queue';
import Events from 'event-pubsub';
let eventParser = new EventParser();
class Client extends Events{
constructor(config,log){
super();
this.config=config;
this.log=log;
this.publish=super.emit;
(config.maxRetries)? this.retriesRemaining=config.maxRetries:0;
eventParser=new EventParser(this.config);
}
Client=Client;
queue =new Queue;
socket=false;
connect=connect;
emit=emit;
retriesRemaining=0;
explicitlyDisconnected=false;
}
function emit(type,data){
this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data);
let message=new Message;
message.type=type;
message.data=data;
if(this.config.rawBuffer){
message=Buffer.from(type,this.config.encoding);
}else{
message=eventParser.format(message);
}
//volitile emit
if(!this.config.sync){
this.socket.write(message);
return;
}
//sync, non-volitile, ack emit
this.queue.add(
syncEmit.bind(this,message)
);
}
function syncEmit(message){
this.log('dispatching event to ', this.id, this.path, ' : ', message);
this.socket.write(message);
}
function connect(){
//init client object for scope persistance especially inside of socket events.
let client=this;
client.log('requested connection to ', client.id, client.path);
if(!this.path){
client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.');
return;
}
const options={};
if(!client.port){
client.log('Connecting client on Unix Socket :', client.path);
options.path=client.path;
if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){
options.path = options.path.replace(/^\//, '');
options.path = options.path.replace(/\//g, '-');
options.path= `\\\\.\\pipe\\${options.path}`;
}
client.socket = net.connect(options);
}else{
options.host=client.path;
options.port=client.port;
if(client.config.interface.localAddress){
options.localAddress=client.config.interface.localAddress;
}
if(client.config.interface.localPort){
options.localPort=client.config.interface.localPort;
}
if(client.config.interface.family){
options.family=client.config.interface.family;
}
if(client.config.interface.hints){
options.hints=client.config.interface.hints;
}
if(client.config.interface.lookup){
options.lookup=client.config.interface.lookup;
}
if(!client.config.tls){
client.log('Connecting client via TCP to', options);
client.socket = net.connect(options);
}else{
client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls);
if(client.config.tls.private){
client.config.tls.key=fs.readFileSync(client.config.tls.private);
}
if(client.config.tls.public){
client.config.tls.cert=fs.readFileSync(client.config.tls.public);
}
if(client.config.tls.trustedConnections){
if(typeof client.config.tls.trustedConnections === 'string'){
client.config.tls.trustedConnections=[client.config.tls.trustedConnections];
}
client.config.tls.ca=[];
for(let i=0; i<client.config.tls.trustedConnections.length; i++){
client.config.tls.ca.push(
fs.readFileSync(client.config.tls.trustedConnections[i])
);
}
}
Object.assign(client.config.tls,options);
client.socket = tls.connect(
client.config.tls
);
}
}
client.socket.setEncoding(this.config.encoding);
client.socket.on(
'error',
function(err){
client.log('\n\n######\nerror: ', err);
client.publish('error', err);
}
);
client.socket.on(
'connect',
function connectionMade(){
client.publish('connect');
client.retriesRemaining=client.config.maxRetries;
client.log('retrying reset');
}
);
client.socket.on(
'close',
function connectionClosed(){
client.log('connection closed' ,client.id , client.path,
client.retriesRemaining, 'tries remaining of', client.config.maxRetries
);
if(
client.config.stopRetrying ||
client.retriesRemaining<1 ||
client.explicitlyDisconnected
){
client.publish('disconnect');
client.log(
(client.config.id),
'exceeded connection rety amount of',
' or stopRetrying flag set.'
);
client.socket.destroy();
client.publish('destroy');
client=undefined;
return;
}
setTimeout(
function retryTimeout(){
if (client.explicitlyDisconnected) {
return;
}
client.retriesRemaining--;
client.connect();
}.bind(null,client),
client.config.retry
);
client.publish('disconnect');
}
);
client.socket.on(
'data',
function(data) {
client.log('## received events ##');
if(client.config.rawBuffer){
client.publish(
'data',
Buffer.from(data,client.config.encoding)
);
if(!client.config.sync){
return;
}
client.queue.next();
return;
}
if(!this.ipcBuffer){
this.ipcBuffer='';
}
data=(this.ipcBuffer+=data);
if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){
client.log('Messages are large, You may want to consider smaller messages.');
return;
}
this.ipcBuffer='';
const events = eventParser.parse(data);
const eCount = events.length;
for(let i=0; i<eCount; i++){
let message=new Message;
message.load(events[i]);
client.log('detected event', message.type, message.data);
client.publish(
message.type,
message.data
);
}
if(!client.config.sync){
return;
}
client.queue.next();
}
);
}
export {
Client as default,
Client
};