Буйство потоков или как раскатить миллионы записей на сотню клиентов ч.1

Предисловие

Статей, возможно, будет две, если конечно не поленюсь и найду время. С последним нынче беда. Итак, это первая статья посвящена идеям и реализации сервера. Полных исходников в ней не будет, не смотря на то что сервер уже в бета-тесте.

Итак, существует у нас некий API-сервис по работе с NoSQL. Собственно сама NoSQL представлена кластером MongoDB. Изначально существовало серверное API на GO и клиенты на том же GO. Ну и некая внешняя API. Принцип работы клиент-серверной части заключался в том, что в NoSQL заливается куча информации с различных узлов связи, будь то 1С:Предприятие (через REST), MSSQL-базы, некоторые внешние сервисы. Вся эта информация хранится и, что важнее для данного вопроса, некоторую часть этой информации необходимо передавать на разрозненные MSSQL-сервера.

Сейчас не рассматриваю вопрос получения данных, интересует именно раздача. Изначально она была организована так:

  1. у клиента есть задания в виде файлов
  2. клиент шлет запрос на апи с заданными параметрами поиска
  3. сервер делает выборку согласно заданным параметрам, разбивает её на на 10 частей (ограничение выборки 10 частей по 1000 документов)
  4. кладет каждую часть выборки в redis с уникальным ключом
  5. возвращает массив уникальных ключей клиенту
  6. клиент за 10 запросов достает данные (10 тыс. штук), после успешного запроса соответствующий ключ в redis очищается
  7. клиент вставляет каждую запись insert-ом (т.е. 10 тыс. вставок)
  8. у каждого документа есть версия (грубо говоря таймштамп, по которому и происходит отбор)
  9.  при каждой вставке обновляется дополнительно запись общего таймштампа для таблицы (в отдельной таблице)

Система с версиями вполне могла быть жизнеспособной, но очень недальновидной. С одной стороны, она позволяет контролировать каждый документ (его версию)., обновлять только нужные документы (фактически сравнивая каждый) и т.д.

Уже только из описания можно увидеть проблемы данной архитектуры (допустим, критиковать не стану, т.к. писали её более опытные люди. просто тихо ругнусь матом):

  1. на каждую вставку имеем по сути 2 действия (1 вставка строки, вторая обновление общей версии). итого на 10 тыс. документов по сути 10 тыс. вставок и 10 тыс. обновлений в SQL базе
  2. не совсем ясен смысл прослойки в виде redis, которая поможет только с очень долгими клиентами. но в целом процесс только тормозит
  3. перед апи стоит nginx который вообще не ясно для чего, ведь статика не отдается
  4. ну очень странное решение было применено при помещении данных в redis: выборка документов обжимается gzip-ом (это вроде бы нормально), затем превращается в строку base64 (base64 это +30% объема данных, Карл. В чем смысл сжимать данные, а потом раздувать размер на 30%). ну и для полного трэша эта base64 является всего лишь одним полем json ответа, т.е. сжимается даже не весь ответ
  5. сам о апи не user-friendly, т.е. далеко отходит от языка собственно mongodb

В общем эти проблемы видны невооруженным взглядом. Другая проблема была в том что у нас нет гуру GO.

Ну а третья, то что в один прекрасный день наша база раздулась на 8 миллионов документов, которые нужно было раскатить на 80 mssql-серверов. Итого 640 млн. документов суммарно должно было пройти через это апи. Основная часть этой работы было сделано мимо апи (оно просто умерло при первых же запросах). Тем не менее проблема осталась и возникала даже при необходимости обновить каких-нибудь 500 тыс. документов (логично, умножаем на 80). И нужно было оперативно что-то решать. Проблемы вызывал redis, в котором отсутствовали документы. В какой момент они пропадали и что вообще происходит под такой DDoS-атакой на апи разобрать проблематично. Т.к. документов там не было, база постоянно была завалена запросами (очередь была из тысяч запросов). Ну и как следствие все процессоры уходили в отказ. Отсутствие гуру GO усложняло дебаг.

Исходя из своего опыта, могу сказать что выучить GO (ну или любой другой язык) не такая уж и проблема. Проблема вникнуть в framework (со старта, не зная языка), еще большая проблема вникнуть в архитектуру при фактическом отсутствии документации и комментариев в коде (ну и самого разработчика).

Ну и самое главное, суть ведь не в том чтобы просто вникнуть, суть в том что нужно было сделать что-то в разы более производительное. А для этого уже нужно иметь некоторый опыт работы с языком.

В общем-то я не нашел ничего лучше, чем реализовать на том, на чем у меня есть неплохой опыт построения производительных микросервисов — NodeJS.

Идея

  1. Реализация с контролем каждой записи оставляла желать лучшего. Контролировать актуальность записей все таки нужно, логично, что необходимо контролировать выборку.
  2. Средство контроля достаточное, на мой взгляд, хэши в sha1.
  3. Мной было принято решение (в ходе предварительных тестов), что можно взять за аксиому: в единицу времени одинаковый запрос в базу данных вернет одинаковые данные.
  4. Ничто не мешает записать ответ базы данных в файл, а затем объединить поток чтения из данного файла и поток ответа сервера. Это позволит сократить одинаковые запросы в базу данных до 1 уникального (соответственно уменьшить в 80 раз нагрузку на базу).
  5. Чтобы этот файл был привязан к тексту запроса, имя файла будет хэш запроса приведенного к строке.
  6. Чтобы уменьшить объем данных, мы можем сжать выборку из базы данных gzip-ом перед записью в файл. Т.о. мы еще и сократим в 80 раз расход ресурсов ЦП на обработку потока (он сожмет его один раз, а не на каждом ответе сервера).
  7. Помимо хэша запроса, ничто не мешает нам собрать хэш результатов выборки.
  8. Чтобы клиент не тянул одинаковые данные с сервера, на клиенте и сервере должны храниться пары хэш-запроса — хэш-результата. При обмене эти данные могут быть переданы в хидере. При этом, если полученные хэши совпадают с теми что есть на сервере, можно вернуть клиенту соответствующий статус сервера (данные не изменились), а не всю выборку. Экономим трафик.
  9. Чтобы контролировать кэширование, сервер должен хранить данные о таймауте кэширования и таймштамп, когда кэш был записан.
  10. Операция одиночной вставки слишком тормозная, гораздо быстрее чистить таблицу и вставлять при помощи bulk (протестировано на node mssql вставка миллионов записей измеряется минутами при относительно небольшом расходе ресурсов).
  11. Чтобы клиенту не затирать всю таблицу при одном неудачном запросе, в этой таблице должен быть служебный столбец, в который пишется хэш запроса. Т.о. при неудачном запросе, или ошибке, или изменении хэша результатов — можно удалить только строки, вставленные этим запросом.

Реализация сервера

1.При получении запроса сервер собирает хэш запроса (HASH).

2.Проверяет наличие документа в коллекции Request с {«type» : «hash»,»hash» : «(HASH)»}. И то что срок хранения кэша не истек. Структура объекта (уникальный индекс hash должен быть создан в коллекции для ускорения поиска):
— type — всегда hash
— hash — хэш запроса
— timeout — таймаут кэширования данных
— timestamp — метка времени в epoch
— hashresult — хэш результата

{ 
"_id" : ObjectId("5b3652953effee1c936851ae"), 
"type" : "hash", 
"hash" : "f92319e748e075a0a24a7006e2bbef18799508ca", 
"timeout" : NumberInt(86400000), 
"timestamp" : 1530289944925.0, 
"hashresult" : "b5a6e42b40787b8cd831805b19b1ab04586b3a68"
}

3. Проверяет наличие кэша в файловой системе (путь указан в конфиге, на папку должны быть полные права), имя файла (HASH).

4. Если условия в п.2 и п.3 выполнены идем в п.5, иначе в п.6.

5. Если хэш результата из хидера ics-cache-hashresult соответствует хранимому значению в hashresult документа из п.2 идем в п.5.1, иначе в п.5.2.
5.1. Возвращаем статус 208 (контент не изменился). Идем в п.5.3.
5.2. Возвращаем статус 200 и в потоке отдаем ответ из файла. Идем в п.5.3.
5.3. Если изменился таймаут кэширования обновляем его в коллекции Request. Завершаем обработку.

6. Если в внутренней базе данных приложения запущен процесс формирования кэша идем в п.6.1, иначе в п.6.2.
6.1. Возвращаем статус 206 (Частичный контент). Клиент должен повторить запрос позже. Завершаем обработку.
6.2. Создаем курсор на коллекцию Data (согласно заданным в запросе параметрам).

7. Если количество документов в курсоре больше нуля идем в п.8, иначе в п.7.1.
7.1. Возвращаем статус 204 (Нет контента). Обновляем данные в коллекции Request и завершаем обработку.

8. Создаем поток подсчета хэша результатов.

9. Создаем поток трансформации (для приведения к валидному json) из курсора. В функции трансформации обновляем поток из п.8.

10. Создаем поток записи в файловую систему.

11. Создаем поток сжатия.

12. Если в конфиге задано использование сжатия, то объединяем потоки из п.9-11, иначе потоки из п.9 и п.10.

13. По завершении потока из п.10, уничтожаем потоки из п.8,9 и 11. Шлем мастеру сообщение, что процесс формирования кэша завершен.

14. Отдаем в потоке результат запроса из файла. Обновляем данные в коллекции Request и завершаем обработку.

Интересности

Первое стоит отметить это то, что глупо создавать такой сервис вне кластера. Т.о. нам необходим мастер процесс и воркеры (рабочие) согласно числу ядер (потоков) ЦП. Все это должно работать согласовано. Т.о. если у нас кэшируется один запрос и поступает такой же — не должно быть дубликата кэширования. Для этого я использовал связку Redux (во всех процессах одинаковое хранилище). При обновлении хранилища в мастере по средствам IPC оно передается воркерам, которые, в свою очередь обновляют собственные (целиком). При необходимости изменений в хранилище воркеры по средствам IPC сообщают мастеру о необходимом изменении, тот обновляет хранилище и далее, как описал выше. Т.о. при запуске кэширования — мы пишем передаем хэш запроса и id воркера мастеру, тот добавляет данные в хранилище. При завершении — по аналогичной схеме хэш удаляется из хранилища. В свою очередь остальные воркеры при поступлении запроса проверяют, что он уже не выполняется. Тут есть нюанс, если воркер умер, то все процессы кэширования в нем неуспешны (то что в статусе запущены), для этого нам и нужно передавать id воркера мастеру. Тот не только добавляет информацию, что запрос выполняется, но еще и связывает её с воркером. Когда воркер вернул статус exit (завершен), мы проверяем id живых воркеров и те id что есть в хранилище. И соответственно снимаем с запросов мертвого воркера статус выполнения. А также запускаем новый форк, вместо убитого.

По аналогичной схеме происходит логгирование, т.е. воркеры сами в консоль не пишут (хотя она у них и общая с мастером), они по средствам IPC передают сообщения мастеру, а тот пишет в консоль. При этом сообщения могут быть разных типов. Т.о. есть тип ошибка, при получении таких сообщений мастер не только пишет в лог, но и вызывает функцию отправки ошибки на email.

Изначально я использовал библиотеку CryptoJS и cursor.toArray(). При небольших запросах все было нормально, но при запросах от 100 тыс. Сборка хэша стала потреблять огромное количество памяти, да и преобразование в массив выборки тоже было довольно затратно. В итоге я пришел к тэтому:

cursor.count(function(err, count) {
	try{
		if(err){
			throw err;
		} else{
			if(count > 0){	//проверяем что выборка больше 0
				var tempuid = Hasher(JSON.stringify(parsedDataclear.find) + parsedDataclear.limit.toString() + JSON.stringify(parsedDataclear.sort) + parsedDataclear.skip.toString()); //имя файла
				var pathFile = query_path+tempuid;	//путь к файлу записи
				var filestream = fs.createWriteStream(pathFile);	//поток записи в файл
				const hash = crypto.createHash('sha1');	//поток генерации хэша
				var summ = 0;	//количество обработанных документов
				function transformDocs(doc){	//обработка документов перед записью в файл
					switch(summ){
						case 0:
							var newdoc = '['+JSON.stringify(doc)+',';	//первый док
							break;
						case (count-1):
							var newdoc = JSON.stringify(doc)+']';	//последний док
							break;
						default:
							var newdoc = JSON.stringify(doc)+',';	//остальные доки
							break;
					}
					summ++;	//увеличиваем кол-во обработанных документов на 1
					hash.update(newdoc); //обновляем хэш
					return newdoc;
				}
				var tempstream = cursor.stream({transform: x => transformDocs(x)});	//поток чтения данных из базы
				const gzipper = zlib.createGzip();	//поток сжатия
				function closerErrStream(err){ //обработка ошибок в 3-х потоках
					if(err){
						tempstream.unpipe(); //отвязываем потоки
						tempstream.destroy();	//уничтожаем потоки
						filestream.destroy(err);
						gzipper.destroy(err);
						hash.destroy(err);
						fs.unlink(pathFile, (error) => {	//удаляем файл
							if (!error) {
								process.send({warning:datetime() + 'Удален некорректный файл: ' + pathFile});
							}
						});
						process.send({error:datetime() + 'Ошибка записи результатов запроса в файл: ' + err});
						resolve([500, 'Internal Server Error']);
					} else {
						filestream.end(); //закрываем поток записи
					}
				};
				if(usegzip){
					tempstream.pipe(gzipper).pipe(filestream);
				} else {
					tempstream.pipe(filestream);
				}
				filestream.on("finish", function(){ //в случае успешной записи возвращаем 201 и новый hash
					tempstream.destroy();	//уничтожаем потоки
					gzipper.destroy();
					resolve([201, tempuid, tempuid, hash.digest('hex'), timeout]);
					hash.destroy();
					process.send({task:{on:'del', hash:tempuid, worker: cluster.worker.id}});
				});
				filestream.on("error", function(err){ //обработка ошибок потоков
					closerErrStream(err);
				});
				tempstream.on("error", function(err){ 
					closerErrStream(err);
				});
				gzipper.on("error", function(err){ 
					closerErrStream(err);
				});
				hash.on("error", function(err){ 
					closerErrStream(err);
				});
			} else {
				resolve([204, 'No Content', testuid, Hasher(""), timeout]);
			}
		}
	} catch(err){
		process.send({error:datetime() + 'Ошибка обработки запроса: ' + err});
		resolve([500, 'Internal Server Error']);
	}
});

Т.о. мы приводим курсор к потоку трансформации. В функции трансформации мы преобразуем поток документов в валидный json, передаем документы в поток подсчета хэша. После чего мы объединяем поток трансформации с потоком сжатия и потоком записи в файл. В итоге мы имеем в файле валидный json — Обработанный Gzip. Ну и по завершении потока записи или ошибках в потоках не забываем убить все открытые потоки. Для потокового подсчета хэша понадобилось сменить CryptoJS на Crypto, cursor.toArray() на cursor.stream(), а также применить сжатие в потоке. Протестировано на миллионе документов, все работает как часы.