Буйство потоков или как раскатить миллионы записей на сотню клиентов ч.2

Вступление

А вот и вторая статья об обмене большими данными, на этот раз о клиенте о клиенте. Первая была о сервере ТУТ. Напомню, что речь идет импорте данных из NoSQL (MongoDB) в SQL (MSSQL), а именно о больших объемах схожих данных. Т.е. в коллекции есть набор документов с одинаковым параметром (например тип), нам нужно синхронизировать весь этот набор с таблицей sql. При этом набор может содержать миллионы документов.
Сервер, кстати, тоже претерпел доработки. В основном они минимальные и заключались в фиксе некоторых багов.
Из значительного, была реализована работа с репликами mongoDB на чтение и примари на запись (т.к. примари сервер у нас вечно загружен, то даже хэши запросов решено было читать с реплик, нотя в изначальном варианте работа с служебной коллекцией была на примари сервере). Так сказать лайфхак, чтобы не мучать примари сервер и не отдавать распределение нагрузки на саму MongoDB (которая у нас её вообще не распределяет как оказалось).
На данный момент сервис прошел бета-тест, где за 4 часа клиенты получили 691.980.813 документов и обновили соразмерное число записей. В тесте участвовал 81 клиент.
Почему на это ушло 4 часа, вместо запланированных 1,4 часа? Сложно сказать, ведь по сути сервис зависит от:

  1. Интернет-соединения (например, 1 клиент просто ушел в оффлайн на несколько часов, после чего успешно выполнил все за запланированные 1,4 часа).
  2. Блокировки/нагрузка MS-SQL, к сожалению не имел возможности промониторить данный процесс, т.к. обновление запустил ночью. А ночью программисты тоже хотят спать.
  3. Загрузка ЦП, собственно все операции производимые клиентом требуют значительных ресурсов процессора, а они не то чтобы везде производительные.
  4. Нюансы архитектуры 32bit и 64bit ОС клиентов, а также разные версии ОС (клиенты все на MS Windows).
  5. Доступность RAM. Для контроля целостности и обратной связи о результате операции обрабатывать данные полностью на потоках — весьма проблемный путь. Даже если сделать объектный поток, поймав некорректный объект — мы не сможем к нему вернуться. Уже не говоря о том, что Bulk требует выборки документов. Т.о. я выбрал размер чанков по 100тыс.документов, которые и контроллировал. А вот операция с чанками данного размера в среднем потребляет до 500МБ RAM. У нас же, к сожалению, есть в «зоопарке» ПК где объем доступной (свободной от иных задач) оперативной памяти в пределах 100-200МБ.

Конфигурация

Итак, типично, полных исходников тут не будет 🙂 Ну и вернемся к логике клиента, он имеет

  1. конфигурационный файл, в котором заданы адрес сервера и настройки подключения к MSSQL.
  2. папку tasks с задачами вида
{
	"database":"***************",
	"interval":360,
	"limit":100000,
	"cache":720,
	"table":{
		"name":"dbo.***************",
		"margins":{
			"Guid":{"param":"***************"},
			"Version":{"param":"***************"},
			"Type":{"requisite":"***************"},
			"Barcod":{"requisite":"***************"},
			"Status":{"requisite":"***************"}
		},
		"notnull":[
			"Barcod"
		]
	},
	"request":{
		"type": {"$eq": "***************"}
	}
}
  • database — наименование базы данных
  • interval — интервал запуска, гарантирует что задача не запустится раньше чем указанное в минутах время с последнего запуска
  • limit — выборка документов. ВАЖНО: маленькая выборка сильно грузит сервер (т.е. запросов будет больше, но размер их меньше), большая выборка потребляет оперативную память на клиенте. При этом при маленьких выборках значительно возрастает время выполнения заданий. Оптимально для обмена данными использовать выборку около 100тыс документов.
  •  cache — таймаут кэширования в минутах, ВАЖНО: таймаут в идеале должен быть больше interval+время выполнения задачи, иначе при сбое задачи — таймаут, возможно, уже истек и начнется повторное кэширование. Маленький таймаут значительно грузит сервер.
  • table — настройки выгрузки в mssql
    • — name — имя таблицы
    • margins — поля таблицы по принципу «Имя_столбца_MSSQL»:{«type_param»:Имя_ключа_в_MongoDB}, где type_param может быть param — ключ в объекте или requisite — ключ в реквизите объекта
    • notnull — не нулевые поля, можно проверить в проекте таблицы
  • request — запрос, полностью идентичный db.Data.find() в MongoDB

Логика работы

Для понимания логики, нужно определить что такое актуальное задание. Итак, задание считается актуальным, если:

  • оно еще не запущено
  • таймаут перезапуска истек
  1. мастер чекает задачи раз в 5 минут и обновляет их в собственной БД (в RAM)
    1. мастер проходит по задачам в собственной БД раз в 15 сек в поисках актуальных заданий.
    2. мастер запускает и поддерживает в рабочем состоянии воркеров кол-во которых = кол-ву ядер ЦП, если воркер умер — запускает нового (обнуляя все задания, которые он выполнял).
    3. мастер проверяет, что воркер не выполняет задачу дольше таймаута (выставил в 12 часов), иначе убивает воркер и создает нового.
    4. отправляет ошибки на email
  2. найдя актуальное задание, мастер проверяет наличие свободных воркеров (не более 1 задания на воркер).
    1. найдя свободный воркер, мастер добавляет заданию статус (выполнение), т.о. оно считается более не актуальным
    2. мастер передает задание воркеру
  3. воркер получив задание проверяет запись в таблице [ICS_Task], запись должна быть наименование задания и его SHA1 сумма.
    1. если запись отсутствует — воркер её создает.
    2. если запись существует — воркер проверяет что SHA1 полученного задания соответствует SHA1 в БД.
      1. если они не соответствуют — воркер очищает таблицу, указанную в задании (полностью, т.к. мы не можем оперативно контроллировать целостность данных), очищает связанные с заданием хэши в таблице [ICS_Hash] (иначе таблица не будет заполняться, пока хэш не изменится), обновляет SHA1 в бд и начинает выполнение задания
      2. если они соответствуют — воркер начинает выполнение задания
  4. Проверив, что кол-во не успешных запросов меньше 5 (иначе прекращаем выполнение), воркер считает SHA1 сумму поискового запроса.
    1. Если количество не успешных попыток больше 5, завершаю выполнение. отправляю мастеру что задача завершена (мастер сделает ей статус «не запущена»).
  5. Воркер делает запрос в служебную таблицу [ICS_Hash], где лежат пары SHA1 запроса — SHA1 результата и добавляет их в хидеры запроса (если они есть)
  6. Делает REST-запрос с заданными параметрами, динамических параметра 2 это выборка n-документов(limit) и пропуск n-документов(skip)
    1. Если запрос успешен (статус 200 или 201), приводит данные к виду массив объектов:ключ(имя столбца)-значение (согласно заданию), если не успешен идем в п.4.
      1. Очищаем в таблице все поля, привязанные к данному запросу (служебное поле ics_hash)
      2. Если обработка данных успешна делаем вставку выборки (bulk) в таблицу. Если вставка успешна — пишем пару хэшей в [ICS_Hash]. При ошибке идем в п.4.
      3. Меняем limit и skip и идем в п.4
    2. Если запрос успешен (статус 204), т.е. сервер вернул что данных нет.
      1. Удаляю все записи таблицы (из задания), привязанные к данному запросу (мало ли, раньше этот запрос содержал данные).
      2.  Обновляю пару хэш запроса — хэш результата.
      3. Завершаю выполнение. Отправляю мастеру сообщение что воркер завершил задание и оно может быть актуальным.
    3. Если запрос успешен (статус 206, т.е. сервер уже выполняет кэширование данных по аналогичному запросу), идем в п.4 с отсрочкой в 1 минуту.
    4. Если запрос успешен (статус 208, т.е. сервер вернул что данные не изменились), меняем limit и skip и идем в п.4.
    5. В остальных случаях идем в п.4 (ошибка).

Запуск запросов прописан через функцию Random с зависимостью от id workera и случайного компонента, чтобы не забивать сервер запросами.

Интересности

Библиотека mssql  метод bulk не работает с объектом ключ-значение (имя столбца-значение), по крайней мере у меня её заставить так работать не удалось. А вставлять массивом — идея мне не нравится, т.к. нет привязки к столбцу. Пришлось спуститься на уровень ниже до библиотеки tedious, о чем, в принципе, не жалею. Все вполне удобно и смысла от дополнительной обертки нет.
В режиме вставки bulk требует создать структуру данных, перед вставкой. Но это можно обойти, просто присвоив всем данным текстовые поля (правда от null и not null уйти не удалось), при вставке происходит преобразование типов, если есть проблемы — выкидывает исключительную ситуацию.
Вот собственно сам bulk:

function bulkMSSQL(db_server, db_user, db_password, _task, _data, debug, _msg, _summCount){
	var msg = lodash.clone(_msg);	//чистим объекты, кроме _summCount
	return new Promise(function(resolve, reject){
		try {
			const task = lodash.clone(_task); //чистим объекты
			const data = lodash.clone(_data);
			connMSSQL(db_server, db_user, db_password, task.database, debug, _msg).then( function(conn){
				function loadBulkData(){
					try {
						const columns = lodash.keys(data[0]);
						var options = { keepNulls: true };
						var bulkLoad = conn.newBulkLoad(task.table.name, options, function (error, rowCount) {
							try{
								if(error){
									throw error;
								} else {
									_summCount.summ = _summCount.summ + rowCount;
									process.send({log:logMsg(msg)+'Добавлено строк: ' + _summCount.summ});
									resolve('ok');
									conn.close();
								}
							} catch(err){
								process.send({error:logMsg(msg)+'Ошибка вставки строк(bulk): ' + err});
								resolve('error');
								conn.close();
							}
						});
						for(const col in columns){
							if(task.table.notnull.indexOf(columns[col]) === -1){
								bulkLoad.addColumn(columns[col], mssql.TYPES.NVarChar, { length: 255, nullable: true });
							}else {
								bulkLoad.addColumn(columns[col], mssql.TYPES.NVarChar, { length: 255, nullable: false });
							}
						}
						for(var key = 0; key < data.length; key++){
							bulkLoad.addRow(data[key]);
						}
						conn.execBulkLoad(bulkLoad);
					} catch(err){
						process.send({error:logMsg(msg)+'Ошибка функции loadBulkData: ' + err});
						conn.close();
						resolve('error');
					}
				}
				if(conn !== 'error'){
					loadBulkData();
				} else {
					resolve('error');
				}
			}).catch(function(error){
				resolve('error');
			});
		} catch(err){
			process.send({error:logMsg(msg)+'Ошибка в функции bulkMSSQL: ' + err});
			resolve('error');
		}
	});
}

Также довольно странный факт, что потоковый toJSON() значительно более затратный, чем JSON.stringify() с точки зрения RAM.
Node 8.11 на одном и том же задании с Node 10.6 потребляла на 400МБ(в пике) больше памяти.
Buffer.concat() весьма затратная операция. Если у нас есть набор буфферов, то лучше (в разы) сначала пушить их в массив, а потом делать Buffer.concat([Array]), чем Buffer.concat([old Buffer],[new Buffer]) в цикле.
Если обрабатывать чанки как строку в POST запросе можем получить нечитаемый символ русской кирилицы в UTF-8. Т.к. символ кирилицы занимает 2 байта, они могут попасть в разные чанки и получим два нечитаемых символа.

Итоги

Вот собственно и все о чем хотелось написать. Статья вышла сухая, но она исчерпывающе описывает логику проекта. При этом не было цели «слить» реализацию, а лишь задать вектор тому, кому это будет полезным.

Доработки (Важно)

Из-за тормозов mongodb на больших skip-ах была переделана логика работы.  skip больше 1млн приводил к незначительному торможению запросов, но увеличив его до 8млн запрос уже выполнялся от 6 минут до получаса что ни в какие ворота. Решение было использовать агрегацию.

По умолчанию MongoDB создает значения для поля _id типа ObjectID. Это значение определено в BSON spec и структурировано таким образом:
ObjectID (12 байтов HEX string) = Дата (4 байта, значение временной метки, представляющее количество секунд с эпохи Unix) + MAC-адрес (3 байта) + PID (2 байта) + Счетчик (3 байта)

Т.о. у нас есть фактически уникальный индекс с привязкой к метке времени. В таблицу [ICS_Hash] мы добавляем поле [LastUid], куда будем писать _id последнего пришедшего в выборке объекта. В том же запросе, где мы получаем [Hash] и [HashResult] добавляю к SELECT-у еще и [LastUid], который будет добавляться в запрос по принципу:

var request = {}; //запрос
request._id = {}; //создаем _id - объект внутри запроса
request._id["$qt"] = LastUid; //задаем выборку _id старше последнего

При обновлении [ICS_Hash] дополнительно пишем [LastUid] из запроса. Соответственно, чтобы не включать на клиенте библиотеку mongodb — сервер должен _id возвращать как string.

На сервере в функции трансформации потока делаем _id.toString():

function transformDocs(_doc){	//обработка документов перед записью в файл
	var doc = lodash.clone(_doc);
	doc["_id"] = doc["_id"].toString();
	switch(summ){
		case 0:
			var newdoc = '['+JSON.stringify(doc)+',';	//первый док
			break;
		case (count-1):
			var newdoc = JSON.stringify(doc)+']';	//последний док
			break;
		default:
			var newdoc = JSON.stringify(doc)+',';	//остальные доки
			break;
	}
	summ++;	//увеличиваем кол-во обработанных документов на 1
	hash.update(newdoc); //обновляем хэш
	return newdoc;
}

А в функции валидации входящего запроса, наоборот превращаем _id в объект:

function inputDataValidQuery(data, query_lim){
	try {
		var dataobject = JSON.parse(data.toString());
		var datavalid = {find:{}, sort:{"_id": 1}, limit:10000, skip:0};
		if(typeof(dataobject.find) === 'object'){
			datavalid.find = dataobject.find;
			if(typeof(datavalid.find["_id"]) === 'object'){
				for(const k in datavalid.find["_id"]){
					datavalid.find["_id"][k] = ObjectID(datavalid.find["_id"][k]);
				}
			} else if (typeof(datavalid.find["_id"]) === 'string'){
				datavalid.find["_id"] = ObjectID(datavalid.find["_id"]);
			}
			if(typeof(dataobject.sort) === 'object'){
				datavalid.sort = dataobject.sort;
			}
			if((typeof(dataobject.limit) === 'number') && !isNaN(dataobject.limit) && (dataobject.limit < 1000000)){ //установливаю предел выборки в 1млн
				datavalid.limit = dataobject.limit;
			} else {
				datavalid.limit = query_lim;
			}
			if((typeof(dataobject.skip) === 'number') && !isNaN(dataobject.skip)){ 
				datavalid.skip = dataobject.skip;
			}
			return datavalid;
		} else {
			return 'err';
		}
	} catch(e){
		return 'err';
	}
}