Задача потока данных

Задача потока данных инкапсулирует подсистему потока данных, которая перемещает данные между источниками и назначениями, а также позволяет пользователю преобразовывать, очищать и изменять данные при перемещении. Добавление задачи потока данных в поток управления пакетами позволяет пакету извлекать, преобразовывать и загружать данные.

Поток данных состоит по крайней мере из одного компонента потока данных, но обычно это набор компонентов подключенного потока данных: источников, извлекающих данные; преобразования, которые изменяют, маршрут или суммируют данные; и назначения, которые загружают данные.

Во время выполнения задача потока данных создает план выполнения из потока данных, а подсистема потока данных выполняет план. Вы можете создать задачу потока данных, которая не имеет потока данных, но задача выполняется только в том случае, если он включает по крайней мере один поток данных.

Для массовой вставки данных из текстовых файлов в базу данных SQL Server можно использовать задачу массовой вставки вместо задачи потока данных. Однако задача массовой вставки не может преобразовывать данные. Дополнительные сведения см. в разделе "Задача массовой вставки".

Несколько потоков

Задача потока данных может включать несколько потоков данных. Если задача копирует несколько наборов данных, а если порядок копирования данных не является значительным, то может быть удобнее включить несколько потоков данных в задачу потока данных. Например, можно создать пять потоков данных, каждый из которых копирует данные из неструктурированного файла в другую таблицу измерений в схеме звездочки хранилища данных.

Однако подсистема потока данных определяет порядок выполнения при наличии нескольких потоков данных в одной задаче потока данных. Поэтому, если порядок важен, пакет должен использовать несколько задач потока данных, каждая задача, содержащая один поток данных. Затем можно применить ограничения приоритета для управления порядком выполнения задач.

На следующей схеме показана задача потока данных с несколькими потоками данных.

Потоки данных

Записи логов

Службы Integration Services предоставляют набор событий журнала, доступных для всех задач. Службы Integration Services также предоставляют настраиваемые записи журналов для многих задач. Дополнительные сведения см. в разделе логирование служб Integration Services (SSIS) и настраиваемые сообщения для логирования. Задача потока данных включает следующие пользовательские записи журнала:

Запись журнала Описание
BufferSizeTuning Указывает, что задача потока данных изменила размер буфера. Запись журнала описывает причины изменения размера и перечисляет временный размер буфера.
OnPipelinePostEndOfRowset Указывает, что компонент получил сигнал о конце набора строк, который устанавливается последним вызовом ProcessInput метода. Запись записывается для каждого компонента в потоке данных, который обрабатывает входные данные. Запись содержит имя компонента.
OnPipelinePostPrimeOutput Указывает, что компонент завершил последний вызов PrimeOutput метода. В зависимости от потока данных может быть записано несколько записей журнала. Если компонент является источником, эта запись журнала означает, что компонент завершил обработку строк.
OnPipelinePreEndOfRowset Указывает, что компонент будет получать сигнал конца набора строк, который устанавливается последним вызовом ProcessInput метода. Запись записывается для каждого компонента в потоке данных, который обрабатывает входные данные. Запись содержит имя компонента.
OnPipelinePrePrimeOutput Указывает, что компонент будет получать вызов из PrimeOutput метода. В зависимости от потока данных может быть записано несколько записей журнала.
OnPipelineRowsSent Сообщает количество строк, предоставленных входным данным компонента вызовом ProcessInput метода. Запись журнала содержит имя компонента.
PipelineBufferLeak Предоставляет сведения о любом компоненте, который хранит буферы в живых после того, как диспетчер буферов исчезает. Если буфер по-прежнему жив, ресурсы буферов не были освобождены и могут привести к утечке памяти. Запись журнала содержит имя компонента и идентификатор буфера.
PipelineComponentTime Сообщает о количестве времени (в миллисекундах), которое компонент провел в каждом из пяти основных этапов обработки: Validate, PreExecute, PostExecute, ProcessInput и ProcessOutput.
PipelineExecutionPlan Сообщает план выполнения потока данных. План выполнения содержит сведения о том, как буферы будут отправляться в компоненты. Эта информация в сочетании с записью журнала PipelineExecutionTrees описывает, что происходит в задаче потока данных.
PipelineExecutionTrees Отображает деревья выполнения разметки в потоке данных. Планировщик подсистемы потока данных использует деревья для создания плана выполнения потока данных.
PipelineInitialization Предоставляет сведения об инициализации задачи. Эти сведения включают каталоги, используемые для временного хранения данных BLOB, размера буфера по умолчанию и количества строк в буфере. В зависимости от конфигурации задачи потока данных может быть записано несколько записей журнала.

Эти записи журнала предоставляют множество сведений о выполнении задачи потока данных при каждом запуске пакета. При многократном выполнении пакетов можно записывать сведения, которые со временем предоставляют важные исторические данные об обработке, выполняемой задачей, проблемах, которые могут повлиять на производительность, и объеме данных, который обрабатывает задача.

Дополнительные сведения об использовании этих записей журнала для мониторинга и повышения производительности потока данных см. в одном из следующих разделов:

Примеры сообщений из задачи потока данных

В следующей таблице перечислены примеры сообщений для записей журнала для очень простого пакета. Пакет использует источник OLE DB для извлечения данных из таблицы, преобразование «Сортировка» для упорядочивания данных и получатель OLE DB для записи данных в другую таблицу.

Запись журнала Сообщения
BufferSizeTuning Rows in buffer type 0 would cause a buffer size greater than the configured maximum. There will be only 9637 rows in buffers of this type.

Rows in buffer type 2 would cause a buffer size greater than the configured maximum. There will be only 9497 rows in buffers of this type.

Rows in buffer type 3 would cause a buffer size greater than the configured maximum. There will be only 9497 rows in buffers of this type.
OnPipelinePostEndOfRowset A component will be given the end of rowset signal. : 1180 : Sort : 1181 : Sort Input

A component will be given the end of rowset signal. : 1291 : OLE DB Destination : 1304 : OLE DB Destination Input
OnPipelinePostPrimeOutput A component has returned from its PrimeOutput call. : 1180 : Sort

A component has returned from its PrimeOutput call. : 1 : OLE DB Source
OnPipelinePreEndOfRowset A component has finished processing all of its rows. : 1180 : Sort : 1181 : Sort Input

A component has finished processing all of its rows. : 1291 : OLE DB Destination : 1304 : OLE DB Destination Input
OnPipelinePrePrimeOutput PrimeOutput will be called on a component. : 1180 : Sort

PrimeOutput will be called on a component. : 1 : OLE DB Source
OnPipelineRowsSent Rows were provided to a data flow component as input. : : 1185 : OLE DB Source Output : 1180 : Sort : 1181 : Sort Input : 76

Rows were provided to a data flow component as input. : : 1308 : Sort Output : 1291 : OLE DB Destination : 1304 : OLE DB Destination Input : 76
PipelineComponentTime The component "Calculate LineItemTotalCost" (3522) spent 356 milliseconds in ProcessInput.

The component "Sum Quantity and LineItemTotalCost" (3619) spent 79 milliseconds in ProcessInput.

The component "Calculate Average Cost" (3662) spent 16 milliseconds in ProcessInput.

The component "Sort by ProductID" (3717) spent 125 milliseconds in ProcessInput.

The component "Load Data" (3773) spent 0 milliseconds in ProcessInput.

The component "Extract Data" (3869) spent 688 milliseconds in PrimeOutput filling buffers on output "OLE DB Source Output" (3879).

The component "Sum Quantity and LineItemTotalCost" (3619) spent 141 milliseconds in PrimeOutput filling buffers on output "Aggregate Output 1" (3621).

The component "Sort by ProductID" (3717) spent 16 milliseconds in PrimeOutput filling buffers on output "Sort Output" (3719).
PipelineExecutionPlan SourceThread0

Drives: 1

Influences: 1180 1291

Output Work List

CreatePrimeBuffer of type 1 for output ID 11.

SetBufferListener: "WorkThread0" for input ID 1181

CreatePrimeBuffer of type 3 for output ID 12.

CallPrimeOutput on component "OLE DB Source" (1)

End Output Work List

End SourceThread0

WorkThread0

Drives: 1180

Influences: 1180 1291

Input Work list, input ID 1181 (1 EORs Expected)

CallProcessInput on input ID 1181 on component "Sort" (1180) for view type 2

End Input Work list for input 1181

Output Work List

CreatePrimeBuffer of type 4 for output ID 1182.

SetBufferListener: "WorkThread1" for input ID 1304

CallPrimeOutput on component "Sort" (1180)

End Output Work List

End WorkThread0

WorkThread1

Drives: 1291

Influences: 1291

Input Work list, input ID 1304 (1 EORs Expected)

CallProcessInput on input ID 1304 on component "OLE DB Destination" (1291) for view type 5

End Input Work list for input 1304

Output Work List

End Output Work List

End WorkThread1
PipelineExecutionTrees begin execution tree 0

output "OLE DB Source Output" (11)

input "Sort Input" (1181)

end execution tree 0

begin execution tree 1

output "OLE DB Source Error Output" (12)

end execution tree 1

begin execution tree 2

output "Sort Output" (1182)

input "OLE DB Destination Input" (1304)

output "OLE DB Destination Error Output" (1305)

end execution tree 2
PipelineInitialization No temporary BLOB data storage locations were provided. The buffer manager will consider the directories in the TEMP and TMP environment variables.

The default buffer size is 10485760 bytes.

Buffers will have 10000 rows by default

The data flow will not remove unused components because its RunInOptimizedMode property is set to false.

Многие события журнала записывают несколько записей, а сообщения для ряда записей журнала содержат сложные данные. Чтобы упростить понимание и обмен данными о содержимом сложных сообщений, можно проанализировать текст сообщения. В зависимости от расположения журналов можно использовать инструкции Transact-SQL или компонент скрипта для разделения сложного текста на столбцы или другие форматы, которые можно найти более полезными.

Например, в следующей таблице содержится сообщение „Строки были предоставлены компоненту потока данных в качестве входных данных“. : 1185 : выходные данные источника OLE DB : 1180 : сортировка : 1181 : входные данные для сортировки : 76", проанализированные по столбцам. Сообщение было записано событием OnPipelineRowsSent , когда строки были отправлены из источника OLE DB в преобразование Sort.

колонна Описание Ценность
PathID Значение свойства маршрута между источником OLE DB ID и преобразованием «Сортировка». 1185
ИмяПути Значение из Name свойства пути. Выходные данные источника OLE DB
ComponentID Значение свойства преобразования Sort ID. 1180
Имя компонента Значение свойства Name преобразования Sort. Сортировать
InputID Значение свойства ID из входа для преобразования Sort. 1181
InputName Значение свойства Name входных данных для преобразования Sort. Сортировка входных данных
RowsSent Количество строк, отправляемых в входные данные преобразования Sort. 76

Настройка задачи потока данных

Свойства можно задать в окне "Свойства " или программным способом.

Дополнительные сведения о том, как задать эти свойства в окне Свойства, щелкните, чтобы открыть следующий раздел:

Программная настройка задачи потока данных

Для получения дополнительной информации о программном добавлении задачи потока данных в пакет и настройке свойств потока данных, перейдите к следующему разделу:

Задание свойств задач или контейнеров

Видео, Balanced Data Distributer на technet.microsoft.com.