ported client and updated new Buffer(type,enc) to Buffer.from(type,enc)

This commit is contained in:
RIAEvangelist 2019-12-04 21:57:50 -08:00
parent 24e1ea41f9
commit d7cf3e1da9
11 changed files with 292 additions and 269 deletions

View file

@ -1,253 +1,41 @@
'use strict';
const net = require('net'),
tls = require('tls'),
EventParser = require('../entities/EventParser.js'),
Message = require('js-message'),
fs = require('fs'),
//modules
const EventParser = require('../entities/EventParser.js'),
Queue = require('js-queue'),
Events = require('event-pubsub');
//members
const emit = require('./members/client/emit.js'),
connect = require('./members/client/connect.js');
let eventParser = new EventParser();
class Client extends Events{
Client = Client;
queue = new Queue;
socket = false;
connect = connect;
emit = emit;
explicitlyDisconnected = false;
retriesRemaining = 0
constructor(config,log){
super();
Object.assign(
this,
{
Client : Client,
config : config,
queue : new Queue,
socket : false,
connect : connect,
emit : emit,
log : log,
retriesRemaining:config.maxRetries||0,
explicitlyDisconnected: false
}
);
this.config=config;
this.log=log;
eventParser=new EventParser(this.config);
this.eventParser=eventParser;
if(!config.maxRetries){
return;
}
this.retriesRemaining = config.maxRetries||0;
}
}
function emit(type,data){
this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data);
let message=new Message;
message.type=type;
message.data=data;
if(this.config.rawBuffer){
message=new Buffer(type,this.config.encoding);
}else{
message=eventParser.format(message);
}
if(!this.config.sync){
this.socket.write(message);
return;
}
this.queue.add(
syncEmit.bind(this,message)
);
}
function syncEmit(message){
this.log('dispatching event to ', this.id, this.path, ' : ', message);
this.socket.write(message);
}
function connect(){
//init client object for scope persistance especially inside of socket events.
let client=this;
client.log('requested connection to ', client.id, client.path);
if(!this.path){
client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.');
return;
}
const options={};
if(!client.port){
client.log('Connecting client on Unix Socket :', client.path);
options.path=client.path;
if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){
options.path = options.path.replace(/^\//, '');
options.path = options.path.replace(/\//g, '-');
options.path= `\\\\.\\pipe\\${options.path}`;
}
client.socket = net.connect(options);
}else{
options.host=client.path;
options.port=client.port;
if(client.config.interface.localAddress){
options.localAddress=client.config.interface.localAddress;
}
if(client.config.interface.localPort){
options.localPort=client.config.interface.localPort;
}
if(client.config.interface.family){
options.family=client.config.interface.family;
}
if(client.config.interface.hints){
options.hints=client.config.interface.hints;
}
if(client.config.interface.lookup){
options.lookup=client.config.interface.lookup;
}
if(!client.config.tls){
client.log('Connecting client via TCP to', options);
client.socket = net.connect(options);
}else{
client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls);
if(client.config.tls.private){
client.config.tls.key=fs.readFileSync(client.config.tls.private);
}
if(client.config.tls.public){
client.config.tls.cert=fs.readFileSync(client.config.tls.public);
}
if(client.config.tls.trustedConnections){
if(typeof client.config.tls.trustedConnections === 'string'){
client.config.tls.trustedConnections=[client.config.tls.trustedConnections];
}
client.config.tls.ca=[];
for(let i=0; i<client.config.tls.trustedConnections.length; i++){
client.config.tls.ca.push(
fs.readFileSync(client.config.tls.trustedConnections[i])
);
}
}
Object.assign(client.config.tls,options);
client.socket = tls.connect(
client.config.tls
);
}
}
client.socket.setEncoding(this.config.encoding);
client.socket.on(
'error',
function(err){
client.log('\n\n######\nerror: ', err);
client.publish('error', err);
}
);
client.socket.on(
'connect',
function connectionMade(){
client.publish('connect');
client.retriesRemaining=client.config.maxRetries;
client.log('retrying reset');
}
);
client.socket.on(
'close',
function connectionClosed(){
client.log('connection closed' ,client.id , client.path,
client.retriesRemaining, 'tries remaining of', client.config.maxRetries
);
if(
client.config.stopRetrying ||
client.retriesRemaining<1 ||
client.explicitlyDisconnected
){
client.publish('disconnect');
client.log(
(client.config.id),
'exceeded connection rety amount of',
' or stopRetrying flag set.'
);
client.socket.destroy();
client.publish('destroy');
client=undefined;
return;
}
setTimeout(
function retryTimeout(){
client.retriesRemaining--;
client.connect();
}.bind(null,client),
client.config.retry
);
client.publish('disconnect');
}
);
client.socket.on(
'data',
function(data) {
client.log('## received events ##');
if(client.config.rawBuffer){
client.publish(
'data',
new Buffer(data,client.config.encoding)
);
if(!client.config.sync){
return;
}
client.queue.next();
return;
}
if(!this.ipcBuffer){
this.ipcBuffer='';
}
data=(this.ipcBuffer+=data);
if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){
client.log('Messages are large, You may want to consider smaller messages.');
return;
}
this.ipcBuffer='';
const events = eventParser.parse(data);
const eCount = events.length;
for(let i=0; i<eCount; i++){
let message=new Message;
message.load(events[i]);
client.log('detected event', message.type, message.data);
client.publish(
message.type,
message.data
);
}
if(!client.config.sync){
return;
}
client.queue.next();
}
);
}
module.exports=Client;

View file

@ -0,0 +1,200 @@
'use strict';
const net = require('net'),
tls = require('tls'),
fs = require('fs'),
Message = require('js-message');
function connect(){
//init client object for scope persistance especially inside of socket events.
let client=this;
client.log('requested connection to ', client.id, client.path);
if(!this.path){
client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.');
return;
}
const options={};
if(!client.port){
client.log('Connecting client on Unix Socket :', client.path);
options.path=client.path;
if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){
options.path = options.path.replace(/^\//, '');
options.path = options.path.replace(/\//g, '-');
options.path= `\\\\.\\pipe\\${options.path}`;
}
client.socket = net.connect(options);
}else{
options.host=client.path;
options.port=client.port;
if(client.config.interface.localAddress){
options.localAddress=client.config.interface.localAddress;
}
if(client.config.interface.localPort){
options.localPort=client.config.interface.localPort;
}
if(client.config.interface.family){
options.family=client.config.interface.family;
}
if(client.config.interface.hints){
options.hints=client.config.interface.hints;
}
if(client.config.interface.lookup){
options.lookup=client.config.interface.lookup;
}
if(!client.config.tls){
client.log('Connecting client via TCP to', options);
client.socket = net.connect(options);
}else{
client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls);
if(client.config.tls.private){
client.config.tls.key=fs.readFileSync(client.config.tls.private);
}
if(client.config.tls.public){
client.config.tls.cert=fs.readFileSync(client.config.tls.public);
}
if(client.config.tls.trustedConnections){
if(typeof client.config.tls.trustedConnections === 'string'){
client.config.tls.trustedConnections=[client.config.tls.trustedConnections];
}
client.config.tls.ca=[];
for(let i=0; i<client.config.tls.trustedConnections.length; i++){
client.config.tls.ca.push(
fs.readFileSync(client.config.tls.trustedConnections[i])
);
}
}
client.config.tls={...client.config.tls,options};
client.socket = tls.connect(
client.config.tls
);
}
}
client.socket.setEncoding(this.config.encoding);
client.socket.on(
'error',
function(err){
client.log('\n\n######\nerror: ', err);
client.publish('error', err);
}
);
client.socket.on(
'connect',
function connectionMade(){
client.publish('connect');
client.retriesRemaining=client.config.maxRetries;
client.log('retrying reset');
}
);
client.socket.on(
'close',
function connectionClosed(){
client.log('connection closed' ,client.id , client.path,
client.retriesRemaining, 'tries remaining of', client.config.maxRetries
);
if(
client.config.stopRetrying ||
client.retriesRemaining<1 ||
client.explicitlyDisconnected
){
client.publish('disconnect');
client.log(
(client.config.id),
'exceeded connection rety amount of',
' or stopRetrying flag set.'
);
client.socket.destroy();
client.publish('destroy');
client=undefined;
return;
}
setTimeout(
function retryTimeout(){
client.retriesRemaining--;
client.connect();
}.bind(null,client),
client.config.retry
);
client.publish('disconnect');
}
);
client.socket.on(
'data',
function(data) {
client.log('## received events ##');
if(client.config.rawBuffer){
client.publish(
'data',
new Buffer(data,client.config.encoding)
);
if(!client.config.sync){
return;
}
client.queue.next();
return;
}
if(!this.ipcBuffer){
this.ipcBuffer='';
}
data=(this.ipcBuffer+=data);
if(data.slice(-1)!=client.eventParser.delimiter || data.indexOf(client.eventParser.delimiter) == -1){
client.log('Messages are large, You may want to consider smaller messages.');
return;
}
this.ipcBuffer='';
const events = client.eventParser.parse(data);
const eCount = events.length;
for(let i=0; i<eCount; i++){
let message=new Message;
message.load(events[i]);
client.log('detected event', message.type, message.data);
client.publish(
message.type,
message.data
);
}
if(!client.config.sync){
return;
}
client.queue.next();
}
);
}
module.exports=connect;

View file

@ -0,0 +1,33 @@
'use strict';
const Message = require('js-message');
function emit(type,data){
this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data);
let message=new Message;
message.type=type;
message.data=data;
if(this.config.rawBuffer){
message=new Buffer(type,this.config.encoding);
}else{
message=this.eventParser.format(message);
}
if(!this.config.sync){
this.socket.write(message);
return;
}
this.queue.add(
syncEmit.bind(this,message)
);
}
function syncEmit(message){
this.log('dispatching event to ', this.id, this.path, ' : ', message);
this.socket.write(message);
}
module.exports=emit;

View file

@ -11,25 +11,23 @@ const net = require('net'),
let eventParser = new EventParser();
class Server extends Events{
udp4 = false;
udp6 = false;
server = false;
sockets = [];
emit = emit;
broadcast = broadcast;
constructor(path,config,log,port){
super();
Object.assign(
this,
{
config : config,
path : path,
port : port,
udp4 : false,
udp6 : false,
log : log,
server : false,
sockets : [],
emit : emit,
broadcast : broadcast
}
);
eventParser=new EventParser(this.config);
this.config = config;
this.path = path;
this.port = port;
this.log = log;
this.on(
'close',
@ -74,7 +72,7 @@ function emit(socket, type, data){
if(this.config.rawBuffer){
this.log(this.config.encoding)
message=new Buffer(type,this.config.encoding);
message=Buffer.from(type,this.config.encoding);
}else{
message=eventParser.format(message);
}
@ -104,7 +102,7 @@ function broadcast(type,data){
message.data=data;
if(this.config.rawBuffer){
message=new Buffer(type,this.config.encoding);
message=Buffer.from(type,this.config.encoding);
}else{
message=eventParser.format(message);
}
@ -152,7 +150,7 @@ function serverClosed(){
function gotData(socket,data,UDPSocket){
let sock=((this.udp4 || this.udp6)? UDPSocket : socket);
if(this.config.rawBuffer){
data=new Buffer(data,this.config.encoding);
data=Buffer.from(data,this.config.encoding);
this.publish(
'data',
data,
@ -240,7 +238,7 @@ function serverCreated(socket) {
let data;
if(this.config.rawSocket){
data=new Buffer(msg,this.config.encoding);
data=Buffer.from(msg,this.config.encoding);
}else{
data=msg.toString();
}
@ -374,7 +372,7 @@ function startTLSServer(){
}
function UDPWrite(message,socket){
let data=new Buffer(message, this.config.encoding);
let data=Buffer.from(message, this.config.encoding);
this.server.send(
data,
0,

View file

@ -1,12 +1,12 @@
'use strict';
const Defaults = require('../entities/Defaults.js'),
connectTo = require("../members/connectTo.js"),
connectToNet = require("../members/connectToNet.js"),
disconnect = require("../members/disconnect.js"),
serve = require("../members/serve.js"),
serveNet = require("../members/serveNet.js"),
log = require("../members/log.js");
connectTo = require("./members/IPC/connectTo.js"),
connectToNet = require("./members/IPC/connectToNet.js"),
disconnect = require("./members/IPC/disconnect.js"),
serve = require("./members/IPC/serve.js"),
serveNet = require("./members/IPC/serveNet.js"),
log = require("./members/IPC/log.js");
class IPC{
of={};

View file

@ -1,6 +1,6 @@
'use strict';
const Client = require('../dao/client.js');
const Client = require('../../../dao/client.js');
function emptyCallback(){};

View file

@ -1,7 +1,7 @@
'use strict';
const Client = require('../dao/client.js'),
Server = require('../dao/socketServer.js');
const Client = require('../../../dao/client.js'),
Server = require('../../../dao/socketServer.js');
function emptyCallback(){};
@ -72,8 +72,12 @@ function connectToNet(id,host,port,callback){
this.of[id].path = host;
this.of[id].port = port;
console.log(this.of[id]);
this.of[id].connect();
callback(this);
}

View file

@ -1,6 +1,6 @@
'use strict';
const Server = require('../dao/socketServer.js');
const Server = require('../../../dao/socketServer.js');
function serve(path,callback){
if(typeof path=='function'){

View file

@ -1,6 +1,6 @@
'use strict';
const Server = require('../dao/socketServer.js');
const Server = require('../../../dao/socketServer.js');
function emptyCallback(){};