Redux-Cluster продолжение или синхронизация памяти в процессах кластера серверов.

Redux-Cluster Или о чем эта статья

В данной статье я подробно разберу функционал библиотеки redux-cluster, архитектурные схемы работы с памятью, автоматическое тестирование модуля, а также приведу пример использования.

В предыдущей статье я рассказал о идее создания библиотеки синхронизации памяти в кластере (имеется ввиду модуль cluster). Библиотека прошла тест и была внедрена на боевой сервер, каких бы то ни было значительных проблем выявлено не было. Однако, мне стало интересно пойти дальше и реализовать возможность «общего» (в реальности память процессов изолирована и факт существования доступа к памяти другого процесса был бы значительным бэкдором, поэтому в каждом процессе будет своя копия стека и все эти копии должны быть максимально идентичны в единицу времени) стека памяти не только в модуле cluster, но и в обособленных процессах. А также возможность синхронизации памяти на удаленных серверах.

Чтобы не привязываться к библиотекам типа socket.io (не подумайте, мне нравятся готовые решения, но в данном случае речь о скорости и производительности) было принято решение минимально использовать библиотеки не входящие в стандартный стек NodeJS (стандартный стек считаю тем, что описан в документации фонда). Как оказалось позже, я все же парочку использовал в силу собственной лени (да и смысл переписывать то, что уже показало свою стабильность и производительность в проектах), а именно 2 библиотеки JSONStream и event-stream. Эти же модули я, например, использую для потокового парсинга входящего json при заливке данных в MongoDB а там нагрузки в сотни, если не тысячи раз выше, так что не думаю что с этим могут быть проблемы (количество установок данных библиотек тому явное подтверждение).

Итак за основу было принято взять модуль NET, т.к. он позволяет охватить весь спектр сокетов:

  1. TCP
  2. Unix file-сокеты
  3. Именованные каналы Windows

Базовая схема Redux-Cluster или нативные методы Redux

Как установить и подключить модуль вы, конечно, знаете и с Redux вероятно уже работали (что такое диспатчер/редьюсер/функция работы со стором вы тоже знаете), по крайней мере это вам потребуется освоить прежде чем использовать данный модуль. Для того чтобы было от чего отталкиваться опишу стандартный функционал Redux (реализован в версии 1.0):

  1. Создание хранилища
    var Test = ReduxCluster.createStore(editProcessStorage);
    function editProcessStorage(state = {versions:[]}, action){
    	var state_new = Lodash.clone(state);
    	switch (action.type){
    		case 'UPDATE': 
    			state_new.versions = action.payload.versions;
    			return state_new;
    			break;
    	}
    	return state_new;
    }
  2. Подписка на обновление
    Test.subscribe(function(){ ... });
  3. Вызов события в редьюсере (обновление хранилища)
    Test.dispatch({type:'UPDATE', payload: {versions:Test.getState().versions}});
  4. Получение состояния хранилища
    Test.getState()

Как видно это все штатные методы Redux, так в чем же разница? А разница в том, что достаточно выполнить п.1 в каждом процессе кластера (или форкнуть нужное количество одинаковых процессов cluster.fork()) и методы п. 2-4 будут доступны в каждом процессе, а стек памяти Test.getState() вернет идентичные результаты для каждого процесса в единицу времени (условно). Но это все версия 1.0 суть которой я уже описал в другой статье.

Выход за пределы Redux или возможности Redux-Cluster

В процессе разработки версии 1.1 (она уже не актуальна на данный момент) я отказался от переопределения Redux хранилища на клиенте, вместо этого я просто подменяю все диспатчеры на собственные с приоритетным событием

REDUX_CLUSTER_SYNC

которое вызовет синхронизацию стеков памяти. Был разработан сервер, для создания которого уже созданное хранилище Test (см. п.1) имеет метод createServer:

Test.createServer({host: "0.0.0.0", port: 8888, logins:{test2:'123456'}});

Входящим параметром является Object, который может иметь параметры

  • path — путь к файловому сокету Linux или именованный канал Windows, который будет прослушиваться сервером (если задан, параметры host и port опускаются). Тут есть важный момент, что именованные каналы в Windows не могут создаваться дочерними процессами кластера.
  • host — имя хоста или ip-адрес интерфейса, который будет прослушиваться сервером (по умолчанию 0.0.0.0 — т.е. все интерфейсы)
  • port — порт, который будет прослушиваться сервером (по умолчанию 10001)
  • logins — пары имя пользователя: пароль вида {login1:password1, login2:password2} для авторизации в сокете

В сервер встроена система блокировки при пятикратном вводе некорректных данных авторизации (блокирует IP адрес клиента на 3 часа). Авторизация также используется и для файлового сокета/именованного канала. Идентификация происходит по SHA1 имени пользователя и пароля с солью. Тем не менее я не советую использовать синхронизацию напрямую через интернет без использования дополнительных способов защиты (ограничение по ip в firewall, vpn и т.д.). При падении сервер автоматически восстанавливается. Используя одну пару логин:пароль может быть подключено несколько клиентов.

Для подключения к серверу созданное хранилище Test (см. п.1) имеет метод createClient:

Test.createClient({host: "localhost", port: 8888, login:"test2", password:'123456'});

Входящим параметром является Object, который может иметь параметры

  • path — путь к файловому сокету Linux или именованный канал Windows к которому должен подключиться клиент (если задан, параметры host и port опускаются)
  • host — имя хоста или ip-адрес интерфейса, к которому должен подключиться клиент (по умолчанию localhost)
  • port — порт, к которому должен подключиться клиент (по умолчанию 10001)
  • login — имя пользователя для авторизации
  • password — пароль для авторизации

В клиенте есть важный нюанс, если createClient было вызвано в дочернем процессе кластера (Worker, модуль cluster), то синхронизация с Master процессом будет отключена. Это обусловлено, что Master может также выступать первичным для хранилища Redux и состояние Redux в Worker будет подвержено одновременному изменению из из потока Master и из потока createClient. В отношении сервера таких ограничений нет, им может выступать как Master, так и Worker (исключая невозможность создания именованного канала worker-ом в windows). Т.о. для синхронизации памяти «первичного» кластера (Master и n-Workers) с «вторичным» (Master и n-Workers) сервер может быть создан в любом процессе, а клиентом должен выступать Master процесс «вторичного» кластера. Если же клиентом выступит один из n-Worker «вторичного» кластера, то синхронизация будет только с данным Worker. Т.е. для синхронизации с несколькими процессами нужно будет создать по экземпляру клиента в каждом из них. При этом сервер может быть один общий.

Клиент автоматически восстанавливает соединение при сбое, до тех пор пока соединение не будет восстановлено dispatch из client невозможен (subcribe соответственно также не будет вызван, а возможен лишь getState()). Как только соединение будет восстановлено клиент получит свежий снимок Redux от server. При этом, если client это Master процесс у которого есть несколько синхронизированных Worker — все они могут только читать (getState()). Для того чтобы отлавливать данные моменты наше хранилище способно возвращать статус соединения Boolean:

Test.connected;
  • true — соединение установлено
  • false — соединение разорвано

Причем при разрыве соединения Master в описанной выше схеме, во всех Worker процессах статус также будет установлен как false, а при восстановлении соединения вернется в true.

Для наведения «порядка» в отношениях Master и Worker, Server и Client были созданы роли. Они представляют собой массив и их можно получить по из созданного в п.1 хранилища:

Test.role;

На данный момент они бывают четырех видов и их может быть несколько (настоятельно не рекомендую изменять роли вручную, т.к. это приведет к нестабильности библиотеки):

  • master — получает dispatch от worker и отправляет action в worker
  • worker — получает action от master и отправляет dispatch в master
  • client — получает action от server и отправляет dispatch в server
  • server — получает dispatch от client и отправляет action в client

Роли могут накладываться, т.е., например роль [«worker», «server»] будет получать action от master и в тоже время отправлять их в client, он будет получать dispatch от client и отправлять из в master и в тоже время будет отправлять собственные dispatch в master. Если немного разобраться, то можно составить несколько логических схем, которые покажут что роль [«worker», «client»] невозможна, т.к. worker получает action и от master, и от server, и в тоже время отправляет dispatch и в master и в server. Это просто вызовет рассинхронизацию хранилища, т.к. master и server могут быть изолированы. Именно поэтому при вызове createClient в Worker процессе роль worker будет удалена (а вместе с ней и обработка сообщений от/отправка к master) и останется только [«client»], что в полной мере соответствует описанному для данной роли поведению.

Все это замечательно работает с динамически обновляющимися данными небольшого размера, но есть у меня проект iocommander, который я потихоньку привожу в нормальный вид. В нем несколько хранилищ Redux и они раздуты до нескольких мегабайт. При этом данные изменяются очень быстро, что может вызывать сотню dispatch в секунду. Т.е. сотню раз в секунду хранилище размером в десяток мегабайт должно разлететься во все процессы кластера. Все это создает значительную нагрузку на ЦП. Для того чтобы уменьшить нагрузку мне нужно передавать снимок хранилища только при старте (пересоединении), а во все остальные случаи отправлять action. Изначально я сам парсил json из сокета и единичные ошибки можно было попросту опустить, но в случае action данные ошибки недопустимы. Для этого я использовал библиотеки JSONStream и event-stream. Для того, чтобы уйти от рассинхронизации при выпадении action я решит отправлять снимок хранилища для полной синхронизации раз в 100 (по умолчанию) action. Изменить поведение синхронизации можно задав созданному в п.1 хранилищу соответствующий тип:

Test.mode = "action";

Типов собственно два:

  • snapshot — обмен snapshot при каждом изменении (сбои исключены в принципе, но, в случае больших хранилищ, нагрузка на ЦП)
  • action — передача snapshot при установке соединения и каждый 100 action, в остальных случаях передается только action (меньшая нагрузка на ЦП, но теоретически возможны расхождения в данных)

По факту же передача action ускоряет работу, т.е. данные обновляются быстрее и вероятность идентичности данных в единицу времени даже растет. Я рекомендую использовать за умолчание snapshot если ваше приложение не сильно грузит ЦП, иначе action. В случае использования режима action не стоит производить сложных обработок внутри редьюсера, т.к. эти сложные обработки будут возникать в каждом процессе и в итоге вы даже проиграете в производительности относительно snapshot.

Тип синхронизации актуален и для базовой схемы Master->Workers[] и для схемы Server->Clients[], при этом ключевую роль играет тип заданный в Master и Server соответственно.

Можно изменить период полной синхронизации параметром:

Test.resync = 1000;

Также можно подключить сохранение хранилища на диск и чтение с него при старте (ВНИМАНИЕ! Это доступно в каждом процессе, т.е. нужно задать разные пути для каждого процесса). Если вам необходим этот функционал, рекомендую использовать его в первичном server (и/или master), а также в удаленных осколках кластера (master). Это позволит при старте master загрузить актуальные данные во все подчиненные процессы (client и worker). Подключить работу с диском можно функцией:

Test.backup(<Object>).catch(function(err){
    ... обработка ошибок
});

Результат вернет объект Promise. Входящие данные — объект вида:

{
    path:'./test.backup', 
    key:"password-for-encrypter", 
    count:1000
}
  • path — путь к сохраняемому снимку
  • key — ключ для шифрования/дешифрования снимка
  • timeout — количество секунд, за которые допускается потеря данных (если задано count опускается)
  • count — количество action за которые допускается потеря данных

Иногда в функции возникают не критичные ошибки (критичные я специально не оборачиваю в try..catch/Promise чтобы они все же приводили к сбою приложения), которые мне нужно куда-то испускать. Пример таких ошибок это разрыв соединения, ошибка в обработке json и т.п. По умолчанию я испускал все это в console.error (собственно и сейчас так), но при разрывах соединения эти ошибки могут слишком забивать лог. Для того чтобы иметь возможность выключить логгирование или обработать их собственным способом хранилище Test имеет функцию stderr, которую можно заменить:

Test.stderr = function(err){}

где err — текст ошибки (String). В указанном примере ошибки не будут никуда выводится.

Архитектурные схемы управления памятью в Redux-Cluster

Пример реализации базовой схемы (Master-процесс и 4 Worker-процесса):

В реализации сервера все тоже самое, но добавляется синхронизация в сокет (испускание action) и прослушивание dispatch из сокета сервером:

Схема клиента выглядит схоже, если клиентом выступает Master процесс. Все отличие в том что Master теперь шлет dispatch не в собственный Redux, а в сокет и получает из него синхронизацию (action):

Схема клиента, если клиентом выступает Worker отличается, как видно связи с Master разорваны:

Схема клиента, когда им выступает отдельный процесс абсолютно аналогична  схеме клиента в Worker:

Из таких базовых схем можно составлять сложные конструкции вида:

Здесь есть небольшая неточность с тем что Nammed Channel не может быть создан в дочернем процессе кластера.

Тестирование библиотеки Redux-Cluster

Для проверки работы библиотеки пришлось немного «извратиться» с тестами. Изначально я пытался тестировать мануально (визуально, в лог) но мне требовалось тестирование в псевдо-боевых условиях высокой нагрузки, визуально это тестировать слишком сложно, особенно в случае режима action. Тогда я пошел таким путем:

  • Создаю кластер, в котором есть Master1 и Worker1 в каждом два хранилища Test1 и Test2.
  • Worker1 и Master1 испускают Test1.dispatch
  • Worker1 подписан на синхронизацию Test1 и Test2 (subscribe, обновление хранилища целиком, т.е. snapshot)
  • Master1 сравнивает Test1 и Test2, если они идентичны — говорит ок. сравнение производится через равные интервалы времени.

Мне было лень реализовывать сложный редьюсер, а изменение одного параметра в объекте state по сути означало бы всегда режим snapshot и не показывало бы реальных расхождений. Для того, чтобы покрыть это тестом я пушу в массив элементы числом, т.е. сравнение двух массивов уже более гарантированная идентичность хранилищ. Однако, эти массивы раздувались непомерно и вызывали бы утечку памяти, что ограничило бы тест по времени. Для того, чтобы уйти от этого, как только массив достигнет размера в 500 элементов я срезаю из него первую сотню. Собственно код теста:

/**
 *	Redux-Cluster Test
 *	(c) 2018 by Siarhei Dudko.
 *
 *	standart test (cluster IPC channel)
 *	LICENSE MIT
 */

"use strict"

var ReduxCluster = require('./index.js'),
	Cluster = require('cluster'),
	Lodash = require('lodash'),
	Colors = require('colors');
	
function editProcessStorage(state = {versions:[]}, action){ 
	try {
		switch (action.type){
			case 'TASK': 
				var state_new = Lodash.clone(state);
				if(state_new.versions.length > 500){
					state_new.versions.splice(0,100);
				}
				state_new.versions.push(action.payload.version);
				return state_new;
				break;
			default:
				break;
		}
	} catch(e){
	}
	var state_new = Lodash.clone(state);
	return state_new;
}

function editProcessStorage2(state = {versions:[]}, action){
	try {
		switch (action.type){
			case 'UPDATE': 
				var state_new = Lodash.clone(state);
				state_new.versions = action.payload.versions;
				return state_new;
				break;
			default:
				break;
		}
	} catch(e){
	}
	var state_new = Lodash.clone(state);
	return state_new;
}

	
var Test = ReduxCluster.createStore(editProcessStorage);
Test.mode = "action";
var Test2 = ReduxCluster.createStore(editProcessStorage2);

if(Cluster.isMaster){
	setTimeout(function(){Cluster.fork();}, i*20000)
	
	Test.dispatch({type:'TASK', payload: {version:'MasterTest0'}});
	var i = 0;
	setInterval(function(){
		Test.dispatch({type:'TASK', payload: {version:'MasterTest'+i}});
		i++;
	}, 55);
	
} else {
	
	Test.dispatch({type:'TASK', payload: {version:'WorkerTest0'}});
	var i = 0;
	setInterval(function(){
		Test.dispatch({type:'TASK', payload: {version:'WorkerTest'+i}});
		i++;
	}, 99, i);
	
	Test.subscribe(function(){
		Test2.dispatch({type:'UPDATE', payload: {versions:Test.getState().versions}});
	});
}

if(Cluster.isMaster){
	var ok = 0;
	var bad = 0;
	setInterval(function(){
		if(Lodash.isEqual(Test.getState().versions, Test2.getState().versions)){
			ok++;
			console.log(Colors.green("ok-"+ok+'|'+parseInt((ok*100/(ok+bad)), 10)+'%'));
		}else {
			bad++;
			console.log(Colors.red("bad-"+bad+'|'+parseInt((bad*100/(ok+bad)), 10)+'%'));
		}
	}, 1000);
}

Тестирование в течение 8 часов показало 75% идентичности хранилищ в режиме action, учитывая что в итоге через 8 часов были совпадения хранилищ (данные не разошлись), это в достаточной мере гарантирует, что синхронизация работает достаточно стабильно.

Решил усложнить тест и протестировать два кластера через TCP, схема примерно такая же:

  • Создаю кластер, в котором есть Master1 и Worker1 в каждом два хранилища Test1 и Test2.
  • Создаю кластер, в котором есть Master2 и Worker2 в каждом два хранилища Test1 и Test2.
  • Master1 выступает client для Test2 и server для Test1
  • Master2 выступает client Для Test1
  • Worker2 выступает server для Test2
  • Worker1, Master1 и Worker2, Master2 испускают Test1.dispatch
  • Worker2 подписан на синхронизацию Test1 и Test2 (subscribe, обновление хранилища целиком, т.е. snapshot)
  • Worker1 сравнивает Test1 и Test2, если они идентичны — говорит ок. сравнение производится через равные интервалы времени.

Код для Cluster1 (Master1 и Worker1):

/**
 *	Redux-Cluster Test
 *	(c) 2018 by Siarhei Dudko.
 *
 *	standart test (cluster IPC channel)
 *	LICENSE MIT
 */

"use strict"

var ReduxCluster = require('./index.js'),
	Cluster = require('cluster'),
	Lodash = require('lodash'),
	Colors = require('colors');
	
function editProcessStorage(state = {versions:[]}, action){ 
	try {
		switch (action.type){
			case 'TASK': 
				var state_new = Lodash.clone(state);
				if(state_new.versions.length > 500){
					state_new.versions.splice(0,100);
				}
				state_new.versions.push(action.payload.version);
				return state_new;
				break;
			default:
				break;
		}
	} catch(e){
	}
	var state_new = Lodash.clone(state);
	return state_new;
}

function editProcessStorage2(state = {versions:[]}, action){
	try {
		switch (action.type){
			case 'UPDATE': 
				var state_new = Lodash.clone(state);
				state_new.versions = action.payload.versions;
				return state_new;
				break;
			default:
				break;
		}
	} catch(e){
	}
	var state_new = Lodash.clone(state);
	return state_new;
}

	
var Test = ReduxCluster.createStore(editProcessStorage);
Test.mode = "action";
var Test2 = ReduxCluster.createStore(editProcessStorage2);

if(Cluster.isMaster){
	
	Test.createServer({host: "0.0.0.0", port: 8888, logins:{test2:'123456'}});
	Test2.createClient({host: "localhost", port: 8889, login:"test2", password:'123456'});
	
	setTimeout(function(){Cluster.fork();}, i*20000);
	
	Test.dispatch({type:'TASK', payload: {version:'MasterTest0'}});
	var i = 0;
	setInterval(function(){
		Test.dispatch({type:'TASK', payload: {version:'MasterTest'+i}});
		i++;
	}, 500);
	
} else {
	
	Test.dispatch({type:'TASK', payload: {version:'WorkerTest0'}});
	var i = 0;
	setInterval(function(){
		Test.dispatch({type:'TASK', payload: {version:'WorkerTest'+i}});
		i++;
	}, 505, i);
}

if(!Cluster.isMaster){
	var ok = 0;
	var bad = 0;
	setInterval(function(){
		if(Lodash.isEqual(Test.getState().versions, Test2.getState().versions)){
			ok++;
			console.log(Colors.green("ok-"+ok+'|'+parseInt((ok*100/(ok+bad)), 10)+'%'));
		}else {
			bad++;
			console.log(Colors.red("bad-"+bad+'|'+parseInt((bad*100/(ok+bad)), 10)+'%'));
			console.log(Test.getState().versions.length+' | '+Test2.getState().versions.length)
			console.log(Test.getState().versions[Test.getState().versions.length-1]+' | '+ Test2.getState().versions[Test2.getState().versions.length-1] );
		}
	}, 500);
}

Код для Cluster2 (Master2 и Worker2):

/**
 *	Redux-Cluster Test
 *	(c) 2018 by Siarhei Dudko.
 *
 *	standart test (cluster IPC channel)
 *	LICENSE MIT
 */

"use strict"

var ReduxCluster = require('./index.js'),
	Cluster = require('cluster'),
	Lodash = require('lodash'),
	Colors = require('colors');
	
function editProcessStorage(state = {versions:[]}, action){ 
	try {
		switch (action.type){
			case 'TASK': 
				var state_new = Lodash.clone(state);
				if(state_new.versions.length > 500){
					state_new.versions.splice(0,100);
				}
				state_new.versions.push(action.payload.version);
				return state_new;
				break;
			default:
				break;
		}
	} catch(e){
	}
	var state_new = Lodash.clone(state);
	return state_new;
}

function editProcessStorage2(state = {versions:[]}, action){
	try {
		switch (action.type){
			case 'UPDATE': 
				var state_new = Lodash.clone(state);
				state_new.versions = action.payload.versions;
				return state_new;
				break;
			default:
				break;
		}
	} catch(e){
	}
	var state_new = Lodash.clone(state);
	return state_new;
}

	
var Test = ReduxCluster.createStore(editProcessStorage);
var Test2 = ReduxCluster.createStore(editProcessStorage2);

if(Cluster.isMaster){
	
	Test.createClient({host: "localhost", port: 8888, login:"test2", password:'123456'});
	
	setTimeout(function(){Cluster.fork();}, i*20000);
	
	Test.dispatch({type:'TASK', payload: {version:'MasterRemote0'}});
	var i = 0;
	setInterval(function(){
		Test.dispatch({type:'TASK', payload: {version:'MasterRemote'+i}});
		i++;
	}, 550);
	
} else {
	
	Test2.createServer({host: "0.0.0.0", port: 8889, logins:{test2:'123456'}});
	
	Test.dispatch({type:'TASK', payload: {version:'WorkerRemote0'}});
	var i = 0;
	setInterval(function(){
		Test.dispatch({type:'TASK', payload: {version:'WorkerRemote'+i}});
		i++;
	}, 560, i);
	
	Test.subscribe(function(){
		Test2.dispatch({type:'UPDATE', payload: {versions:Test.getState().versions}});
	});
}

Тестирование за аналогичное время показало 92% идентичности (тесты проводились параллельно на одной машине). Значительных нагрузок по ЦП или ОЗУ выявлено не было.

Пример применения Redux-Cluster

Библиотека Redux-Cluster позволяет синхронизировать память в нескольких процессах, это безусловно удобно. Но помимо этого она позволяет запустить клиента к работающему серверу и залезть ему в «мозги» (память процесса, ака стор Redux). Это позволяет по сути не только читать getState(), но и производить (испускать) удаленные dispatch в Redux на сервере. Мы пойдем дальше и напишем CLI для работы с сервером.

Итак у меня есть модуль, занимающийся логгированием. Я хочу, чтобы в моей CLI был доступен вывод всего лога. При этом на сервере может быть Master и множество Worker или вообще множество Master-ов и Worker-ов (мы рассмотрим первый вариант с одним Master для наглядности). Для того, чтобы получить доступ к выводу консоли каждого из процессов я создам в своем LOGGER Redux-Cluster хранилище, которое будет содержать текущую запись лога. При этом сервер будет создаваться Master-процессом (т.к. я использую windows и именованный канал соответственно), Worker будут клиентами. А моё CLI будет помимо того, что клиентом еще и подписчиком на события в Redux-Cluster. Итак в коде это будет выглядеть так:

var loggerStorage = REDUXCLUSTER.createStore(loggerStore);
	loggerStorage.stderr = function(){};
function loggerStore(state = {type:"LOG", text:""}, action){
	try {
		var state_new = {type:action.type, text: action.payload};
		return state_new;
	} catch(e){
		stdout.warn(COLORS.yellow(datetime()+e));
	}
}
if(process.mainModule.filename.indexOf('process.master.js') !== -1){
	FS.unlink(PATH.join(OS.tmpdir(), "iRetailCClog.sock"), (err) => {
		loggerStorage.createServer({path:PATH.join(__dirname, "TEST-log.sock"), logins:{"***":"****"}});
	});
} else {
	loggerStorage.createClient({path:PATH.join(__dirname, "TEST-log.sock"), login:"***", password:"****"});
}
if(process.mainModule.filename.indexOf('process.console.js') !== -1){
	if(process.argv.indexOf("-v") !== -1)
		loggerStorage.subscribe(function(){
			var _strObj = loggerStorage.getState();
			switch(_strObj.type){
				case 'LOG':
					stdout.log(COLORS.green(_strObj.text));
					break;
				case 'WARN':
					stdout.warn(COLORS.yellow(_strObj.text));
					break;
				case 'ERROR':
					stdout.error(COLORS.red(_strObj.text));
					break;
				case 'HTTP':
					stdout.log(COLORS.gray(_strObj.text));
					break;
				case 'DEBUG':
					stdout.log(_strObj.text);
					break;
			}
		});
}

Как видно у меня есть мастер процесс process.master.js, процесс CLI process.console.js и все остальные. При этом CLI по умолчанию запускается без логгирования, а лог доступен при запуске с флагом -v, т.е.:

node process.console.js -v

Далее мне осталось создать дополнительно в каждой функции stdout испускание dispatcher в loggerStorage, разберу на примере stdout.log():

function log(_val){
	var val = datetime() + _val;
	stdout.log(COLORS.green(val));
	if(process.mainModule.filename.indexOf('process.console.js') === -1)
		loggerStorage.dispatch({type:"LOG" ,payload:val});
}

Реализация CLI:

 "use strict"
 
var READLINE = require('readline'),
	PATH = require('path');	
	
var LOGGER = require(__dirname+'/module.logger.js'),
	PROCSTORE = require(__dirname+'/procstore.js');
	PROCSTORE.stderr = function(err){};
	PROCSTORE.createClient({path:PATH.join(__dirname, "TEST.sock"), login:"***", password:"****"});
	
var P = PROCSTORE;
var stdout = LOGGER;

var commands = {
	"--help": {
		title: "Вызов текущей справки.",
		exec: help
	},
        "P": {
		title: "Снимок хранилища.",
		exec: function(){console.log(P.getState());}
	},
};

var arr = [];
for(const key in commands){
	arr.push({"команда":key, "значение":commands[key].title});
}
function help(){
	console.table(arr, ['команда', 'значение']);
}

help();
const rl = READLINE.createInterface({
	input: process.stdin,
	output: process.stdout,
	prompt: 'CLI> '
});	
rl.prompt();
rl.on('line', (line) => {
	try{
		if(typeof(commands[line]) !== 'undefined'){
			commands[line].exec();
		} else {
			var _tmp = eval(line);
			if(typeof(_tmp) !== 'undefined'){
				console.log(_tmp);
			}
		}
	} catch(err) {
		stdout.error("Ошибка команды: "+err);
		help();
	}
	rl.prompt();
}).on('close', () => {
	stdout.log('CLI disconnected!');
	process.exit(0);
});