Инкрементальная загрузка из баз данных. Универсальный модуль

QVD файлы

Каждый раз, когда мы загружаем данные в приложение Qlik, они запрашиваются из источников в полном объеме. Если наш источник — база данных, то при каждом запуске скрипта мы отправляем в БД запрос, который будет получать все записи, например, из огромной таблицы транзакций.

А зачем нам это делать, если с прошлого обновления данных добавилось просто несколько транзакций, а остальные данные не измененились?

Наверняка вы уже знаете про то, что загружаемые в приложение таблицы можно сохранять в QVD-файлы. Из такого файла данные загружаются в приложения быстрее, чем из любого другого источника. Эти файлы используют для того, чтобы хранить в них чистовые аналитические таблицы, докрученные скриптами Qlik, которые используются в других приложениях без изменений.

Кроме того, QVD-файлы открывают нам возможность инкрементальной загрузки из источников.

Принципы инкрементальной загрузки в Qlik

Смысл инкрементальной загрузки в том, чтобы взять из источник (базы данных) только те данные, которые изменились с момента последнего обновления. Часть данных, которые не изменились, догружаются из QVD-файла и склеивается в одну таблицу. Опционально, также происходит удаление записей, которые удалены в источнике, с помощью inner join. Это все подробно описано в справке Qlik.

Мы же здесь собрались, чтобы познакомиться с инструментом, который делает процесс инкрементального обновления более удобным и управляемым.

Модуль инкрементальной загрузки

Не смотря на простоту подхода, обеспечение инкрементальной загрузки — довольно суетное дело.

  • Нужно сначала создать QVD файл, а потом писать вокруг него инкрементальную загрузку;
  • Нужно предусматривать сценарии для полной перезагрузки данных, если изменился набор полей в источнике;
  • Нужно контроллировать содержимое переменных, содержащих отметки времени о последней загрузке;
  • Возможно, вам придется разбираться с часовыми поясами, в которых фиксируются данные — если они не совпадут со временем на сервере Qlik, придется корректировать таймстампы в функции now().

Чтобы покончить с этим, и быстро запускать надежное инкрементальное обновление данных, мы разработали наш модуль в виде подпрограммы для скрипта Qlik Sense. Его код будет в конце статьи, а сейчас расскажем как он работает.

Подпрограмма покрываем самый полный сценарий инкрементальной загрузки: загрузка измененных записей с исключением удаленных в источнике записей.

Вызов подпрограммы происходит так:

В начале идет перечисление параметров загрузки.

vStore_Path — указываем путь сохранения файла

set vStore_Path=lib://QVD Layer 2.0/Temporary/amoCRM APIv4/Tasks.qvd;

vPrimary_Key — Наименование поля первичного ключа в источнике. Чтобы инкрементальная загрузка работала, в источнике обязательно должен быть первичный ключ, уникальный для каждой записи. Иначе не получится правильно догружать данные из QVD-файла.

set vPrimary_Key=i_taskID; //Первичный ключ

vTS_Field — наименование поля, содержащее отметку времени последнего изменения сущности. Учитывайте, что время должно быть сохранено в формате Datetime, а не Unix Timestamp.

set vTS_Field=i_lastModified; //Поле для отметки времени обновления записи

vTS_Format — формат даты и времени в поле со временем изменения сущности.

set vTS_Format=DD.MM.YYYY hh:mm:ss;//Формат метки времени в поле

vTS_Where_Format — форматирование метки времени для подстановки в запрос к базе данных. Метка времени обозначается как $1, вокруг нее выписываются функции форматирования.

Конструкции типа $1, $2, $3 и т.д. в переменных позволяют в будущем подставить на их место какое-либо значение при вызове переменных. Их используют для повышения универсальности кода.

Смысл в том, что в базе метка времени может храниться не в том формате, который считывает Qlik (Qlik, если распознает поле как дату, применяет к ней свой собственный формат даты). С помощью этой функции можно привести число в вид, который будет понятен базе данных в операторе where.

set vTS_Where_Format=chr(39)&date($1,'YYYY-MM-DD hh:mm:ss')&chr(39);

vTS_Field_Format — обрамление поля дат в условии where. Поле определяется как $1, вокруг него пишутся функции на синтаксисе базы данных.

set vTS_Field_Format=$1;//обрамление поля источника в условии Where. Используется синтаксис источника

vFull_Reset — если установлена единица, происходит полный запрос данных из источника. Иначе говоря, отключается инкрементальное обновление. Полезно, когда в источнике изменился набор полей и нужно переписать все данные в QVD.

set vFull_Reset=0; //Поставьте 1, чтобы перезаписать таблицу полностью

vDelete_Records — задействовать ли блок кода для удаления строк.

set vDelete_Records=1; //1, чтобы удалить строки, удаленные из источника

vDrop_Table — удалять ли сформированную таблицу после сохранения в QVD.

set vDrop_Table=1;//1, чтобы удалить из модели итоговую таблицу

vTable_Source — запрос к источнику. Буквально содержит код запроса, как если бы он писался непосредственно в скрипте загрузки.

Обратите внимание, что на том месте, где по логике должно располагаться условие where, по которому будут отбираться данные только с момента последнего обновления, нужно прописать $1.

let vTable_Source= //Обязательно поставьте $1 на месте, где должено быть условие where
'SQL SELECT 
	`i_taskID`,
    `s_taskID`,
    `entity_type`,
    `entity_id`,
    `i_taskTypeID`,
    `i_dateCreate`,
    `i_lastModified`,
    `i_completeTill`,
    `i_status`,
    `i_createdUserID`,
    `i_responsibleUserID`,
    `s_text`
FROM rusfishcom.`amo.tasks` $1;';

vDelete_Records_Code — код для загрузки первичных ключей из БД. Нужен для дальнейшего удаления из QVD записей, несуществующих в источнике. Вместо названия поля можно написать переменную vPrimary_Key, ведь она уже содержит наименование первичного ключа.

let vDelete_Records_Code= // опишите загрузку первичных ключей из источника. Используйте $(vPrimary_Key) вместо указания первичного ключа
'SQL SELECT 
    `$(vPrimary_Key)`
FROM rusfishcom.`amo.tasks`;';

Подпрограмма вызывается командой call Inc_Load. Никаких параметров в ней нет.

Работа подпрограммы

Если вы формируете файл первый раз, то при загрузке произойдет стандартный запрос всех данных из источника, а сами данные сохранятся в QVD файл.

Фокус в том, что в мета-данных этого файла будет сохранена метка времени самого последнего изменения сущностей в источнике. Т.е. нам не нужно создавать переменные, которые будут помнить дату последней загрузки и как-то управлять ими. Эта информация берется непосредственно из данных.

Форматирование даты в переменной vTS_Where_Format нужно для того, чтобы перевести это число в формат, который поймет база данных.

При повторной загрузке вы увидите сообщение, что выполняется загрузка обновленных данных, а также запрос к источнику с подставленным условием where. Здесь отлично видно, что число было преобразовано в дату с помощью переменной vTS_Where_Format. Переменная vTS_Field_Format позволяет нам при необходимости обработать поле с меткой времени изменения функциями на синтаксие БД.

Теперь при запросе к БД будут загружаться только данные с максимальной меткой времени на последний апдейт. Остальное будет подхватываться из QVD.

Если в источнике изменится состав полей, то новые поля появятся только у записей, обновленных после добавления полей. Чтобы значения новых полей прошли у всех записей, нужно сделать полную перезапись файла с помошью переменной vFull_Reset.

Рекомендации к использованию

Используйте модуль для первичного получения данных из БД. Сложные преобразования проводите уже над QVD-файлом.

Исходный код

Пример вызова подпрограммы

//Основные параметры
set vStore_Path=lib://QVD Layer 2.0/Temporary/amoCRM APIv4/Tasks.qvd; //Путь сохранения файла
set vPrimary_Key=i_taskID; //Первичный ключ
set vTS_Field=i_lastModified; //Поле для отметки времени обновления записи
//Форматы
set vTS_Format=DD.MM.YYYY hh:mm:ss;//Формат метки времени в поле
set vTS_Where_Format=chr(39)&date($1,'YYYY-MM-DD hh:mm:ss')&chr(39); //Форматирование метки времени для использования в условии Where. Используется синтаксис Qlik
set vTS_Field_Format=$1;//обрамление поля источника в условии Where. Используется синтаксис источника

set vFull_Reset=0; //Поставьте 1, чтобы перезаписать таблицу полностью
set vDelete_Records=1; //1, чтобы удалить строки, удаленные из источника
set vDrop_Table=1;//1, чтобы удалить из модели итоговую таблицу

let vTable_Source= //Обязательно поставьте $1 на месте, где должено быть условие where
'SQL SELECT 
	`i_taskID`,
    `s_taskID`,
    `entity_type`,
    `entity_id`,
    `i_taskTypeID`,
    `i_dateCreate`,
    `i_lastModified`,
    `i_completeTill`,
    `i_status`,
    `i_createdUserID`,
    `i_responsibleUserID`,
    `s_text`
FROM rusfishcom.`amo.tasks` $1;';

let vDelete_Records_Code= // опишите загрузку первичных ключей из источника. Используйте $(vPrimary_Key) вместо указания первичного ключа
'SQL SELECT 
    `$(vPrimary_Key)`
FROM rusfishcom.`amo.tasks`;';

call Inc_Load;

Код подпрограммы (нужно вставлять где-нибудь в начале скрипта, до загрузки данных)

sub Inc_Load_Full_Reload
		trace Выполняется полная перезагрузка данных;
		[$(vInc_Load_Store_Name)]:
		$(vTable_Source(where 1=1));
    
    	Inc_Load_MaxDate: load
    	num(max(date#(FieldValue('$(vTS_Field)',recno()),'$(vTS_Format)'))) as Inc_Load_MaxDate
    	AutoGenerate FieldValueCount('$(vTS_Field)');
    
    	let vInc_Load_MaxDate=replace(peek('Inc_Load_MaxDate',0,'Inc_Load_MaxDate'),',','.'); drop table Inc_Load_MaxDate; 
    
    	Comment table [$(vInc_Load_Store_Name)] with "MaxTS:$(vInc_Load_MaxDate)";    

		Store [$(vInc_Load_Store_Name)] into [$(vStore_Path)] (qvd); 
        
		if vDrop_Table=1 then
    	drop table [$(vInc_Load_Store_Name)];
   		end if
        
end sub

sub Inc_Load

let vInc_Load_Store_Name=SubField(subfield('$(vStore_Path)','.qvd',1),'/',SubStringCount('$(vStore_Path)','/')+1);
Trace vInc_Load_Store_Name=$(vInc_Load_Store_Name);

let vInc_Load_File_Exists=0;
////Проверка наличия файла в пути сохранения
for each vStore_Path_Exist in FileList ('$(vStore_Path)');
let vInc_Load_File_Exists=vInc_Load_File_Exists+1;
next



if NoOfRows(vStore_Path_Exist)>0 or vFull_Reset=1 or vInc_Load_File_Exists=0 then

	call Inc_Load_Full_Reload;

else

    
    Inc_Load_MaxDate:
	LOAD
    	subfield("Comment",':','2') as Inc_Load_MaxDate
	FROM [$(vStore_Path)]
	(XmlSimple, table is QvdTableHeader);
    
    vInc_Load_MaxDate_Data=replace(peek('Inc_Load_MaxDate',0,'Inc_Load_MaxDate'),',','.'); drop table Inc_Load_MaxDate;
    vInc_Load_MaxDate_Data=$(vTS_Where_Format(vInc_Load_MaxDate_Data));
    
    if len(vInc_Load_MaxDate_Data)=0 then
    	Trace !!!
    	Файл найден, но в нем нет сохраненной метки последней перезагрузки.
    	Выполняется полная перпезагрузка
    	!!!;
    	call Inc_Load_Full_Reload;
    else
    
		trace Выполняется загрузка обновленных данных
$(vTable_Source(where $(vTS_Field_Format($(vTS_Field)))>=$(vInc_Load_MaxDate_Data)));
        
        
    
		[$(vInc_Load_Store_Name)]:
		$(vTable_Source(where $(vTS_Field_Format($(vTS_Field)))>=$(vInc_Load_MaxDate_Data)));
        
        Concatenate([$(vInc_Load_Store_Name)]) load * from [$(vStore_Path)] (qvd) where not Exists([$(vPrimary_Key)]);
        
        if vDelete_Records=1 then
        
        inner join ([$(vInc_Load_Store_Name)]) load *;
        $(vDelete_Records_Code);
        
        
        end if
        
    Inc_Load_MaxDate: load
	num(max(date#(FieldValue('$(vTS_Field)',recno()),'$(vTS_Format)'))) as Inc_Load_MaxDate
	AutoGenerate FieldValueCount('$(vTS_Field)');
    
	let vInc_Load_MaxDate=replace(peek('Inc_Load_MaxDate',0,'Inc_Load_MaxDate'),',','.'); drop table Inc_Load_MaxDate; 
    
	Comment table [$(vInc_Load_Store_Name)] with "MaxTS:$(vInc_Load_MaxDate)";    

	Store [$(vInc_Load_Store_Name)] into [$(vStore_Path)] (qvd);
    
    if vDrop_Table=1 then
    	trace Идет удаление строк, отсутствующих в источнике;
    	drop table [$(vInc_Load_Store_Name)];
    end if
    
    
	end if
    
end if
	
set vStore_Path_Exist=;
end sub

Выводы

Теперь можно быстро внедрить инкрементальное обновление без необходимости писать сложный код поддержки разных сценариев загрузки, и дублируя запросы под инкремент/не инкремент. А также беспокоиться о поддержке переменных с метками времени.

6 комментариев

  1. Евгений спасибо, очень облегчаете обучение Qlik.
    Опечатки: измененились , контроллировать , должено , синтаксие , помошью , должено , перпезагрузка

      1. Добрый день.
        Прошу прощения, да скрипт рабочий. Но пришлось немного где-то дописать от себя.
        Вопрос такой, зачем в данном блоке используем RecNo()? Нам ведь нужно получить метку даты в 5-значных цифрах. И для чего нужен AutoGenerate FieldValueCount? Вроде далее это нигде не используем.
        Inc_Load_MaxDate: load
        num(max(date#(FieldValue(‘$(vTS_Field)’,recno()),’$(vTS_Format)’))) as Inc_Load_MaxDate
        AutoGenerate FieldValueCount(‘$(vTS_Field)’);
        Кстати, этот блок у меня не работал, добавил resident.
        Inc_Load_MaxDate: load
        num(max(date($(vTS_Field),’$(vTS_Format)’))) as [Inc_Load_MaxDate]
        resident [$(vInc_Load_Store_Name)];

        1. Этой штукой мы получаем максимальное значение таймстампа в поле с датой изменения. Эта конструкция:

          load FieldValue(Поле,recno()) autogenerate FieldValueCount(Поле); Позволяет получить все уникальные значения поля, используя отбор из индексной таблицы, а не перебор значений из исходной таблицы. Потом он помещается в переменную и пишется в комментарий QVD таблицы, чтобы потом оттуда быть прочитанным и использоваться как точка отсчета для загрузки обновленных данных. Мне сложно сказать, как у вас сейчас работает код после внесенных исправлений)

          1. Все, теперь понял.
            Классный скрипт. Спасибо большое.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *