Threading — управление параллельными потоками


Оглавление (нажмите, чтобы открыть):

Многопоточность

Введение в многопоточность. Класс Thread

Одним из ключевых аспектов в современном программировании является многопоточность . Ключевым понятием при работе с многоопоточностью является поток. Поток предствляет некоторую часть кода программы. При выполнении программы каждому потоку выделяется определенный квант времени. И при помощи многопоточности мы можем выделить в приложении несколько потоков, которые будут выполнять различные задачи одновременно. Если у нас, допустим, графическое приложение, которое посылает запрос к какому-нибудь серверу или считывает и обрабатывает огромный файл, то без многопоточности у нас бы блокировался графический интерфейс на время выполнения задачи. А благодаря потокам мы можем выделить отправку запроса или любую другую задачу, которая может долго обрабатываться, в отдельный поток. Поэтому, к примеру, клиент-серверные приложения (и не только они) практически не мыслимы без многопоточности.

Основной функционал для использования потоков в приложении сосредоточен в пространстве имен System.Threading . В нем определен класс, представляющий отдельный поток — класс Thread .

Класс Thread определяет ряд методов и свойств, которые позволяют управлять потоком и получать информацию о нем. Основные свойства класса:

Статическое свойство CurrentContext позволяет получить контекст, в котором выполняется поток

Статическое свойство CurrentThread возвращает ссылку на выполняемый поток

Свойство IsAlive указывает, работает ли поток в текущий момент

Свойство IsBackground указывает, является ли поток фоновым

Свойство Name содержит имя потока

Свойство Priority хранит приоритет потока — значение перечисления ThreadPriority

Свойство ThreadState возвращает состояние потока — одно из значений перечисления ThreadState

Некоторые методы класса Thread:

Статический метод GetDomain возвращает ссылку на домен приложения

Статический метод GetDomainID возвращает id домена приложения, в котором выполняется текущий поток

Статический метод Sleep останавливает поток на определенное количество миллисекунд

Метод Abort уведомляет среду CLR о том, что надо прекратить поток, однако прекращение работы потока происходит не сразу, а только тогда, когда это становится возможно. Для проверки завершенности потока следует опрашивать его свойство ThreadState

Метод Interrupt прерывает поток на некоторое время

Метод Join блокирует выполнение вызвавшего его потока до тех пор, пока не завершится поток, для которого был вызван данный метод

Метод Start запускает поток

Получение информации о потоке

Используем вышеописанные свойства и методы для получения информации о потоке:

В этом случае мы получим примерно следующий вывод:

Так как по умолчанию свойство Name у объектов Thread не установлено, то в первом случае мы получаем в качестве значения этого свойства пустую строку.

Статус потока

Статусы потока содержатся в перечислении ThreadState:

Aborted : поток остановлен, но пока еще окончательно не завершен

AbortRequested : для потока вызван метод Abort, но остановка потока еще не произошла

Background : поток выполняется в фоновом режиме

Running : поток запущен и работает (не приостановлен)

Stopped : поток завершен

StopRequested : поток получил запрос на остановку

Suspended : поток приостановлен

SuspendRequested : поток получил запрос на приостановку

Unstarted : поток еще не был запущен

WaitSleepJoin : поток заблокирован в результате действия методов Sleep или Join

В процессе работы потока его статус многократно может измениться под действием методов. Так, в самом начале еще до применения метода Start его статус имеет значение Unstarted . Запустив поток, мы изменим его статус на Running . Вызвав метод Sleep, статус изменится на WaitSleepJoin . А применяя метод Abort, мы тем самым переведем поток в состояние AbortRequested, а затем Aborted, после чего поток окончательно завершится.

Приоритеты потоков

Приоритеты потоков располагаются в перечислении ThreadPriority:

По умолчанию потоку задается значение Normal. Однако мы можем изменить приоритет в процессе работы программы. Например, повысить важность потока, установив приоритет Highest. Среда CLR будет считывать и анализировать значения приоритета и на их основании выделять данному потоку то или иное количество времени.

Как выполнять параллельные задачи (Threads) в программе для Arduino

В микропроцессорной технике параллельно выполняющиеся задачи называются тредами (Threads) – потоками. Это очень удобно, ведь часто бывает необходимо выполнять несколько операций одновременно. А можно ли заставить микроконтроллер Arduino выполнять сразу несколько задач, как настоящий процессор? Сейчас посмотрим.

  • Arduino UNO или иная совместимая плата;
  • 1 светодиод (вот из такого набора, например);
  • 1 пьезопищалка (вроде этой);
  • соединительные провода (рекомендую вот такой набор);
  • макетная плата (breadboard);
  • персональный компьютер со средой разработки Arduino IDE.

Инструкция по созданию параллельных потоков в программе для Arduino

1 Схема подключения для демонстрации потоков в работе с Arduino

Вообще говоря, Arduino не поддерживает настоящее распараллеливание задач, или мультипоточность. Но можно при каждом повторении цикла loop() указать микроконтроллеру проверять, не наступило ли время выполнить некую дополнительную, фоновую задачу. При этом пользователю будет казаться, что несколько задач выполняются одновременно.

Например, давайте будем мигать светодиодом с заданной частотой и параллельно этому издавать нарастающие и затихающие подобно сирене звуки из пьезоизлучателя. И светодиод, и пьезоизлучатель мы уже не раз подключали к Arduino. Соберём схему, как показано на рисунке.

Если вы подключаете светодиод к цифровому выводу, отличному от «13», не забывайте о токоограничивающем резисторе примерно на 220 Ом.

Схема подключения к Arduino для демонстрации параллельных потоков

2 Управление светодиодом и пьезоизлучателемс помощью оператора delay()

Напишем вот такой скетч и загрузим его в Ардуино.

После включения видно, что скетч выполняется не совсем так как нам нужно: пока полностью не отработает сирена, светодиод не мигнёт, а мы бы хотели, чтобы светодиод мигал во время звучания сирены. В чём же здесь проблема?

Дело в том, что обычным образом эту задачу не решить. Задачи выполняются микроконтроллером строго последовательно. Оператор delay() задерживает выполнение программы на указанный промежуток времени, и пока это время не истечёт, следующие команды программы не будут выполняться. Из-за этого мы не можем задать разную длительность выполнения для каждой задачи в цикле loop() программы. Поэтому нужно как-то сымитировать многозадачность.

3 Параллельные процессы без оператора «delay()»

Вариант, при котором Arduino будет выполнять задачи псевдо-параллельно, предложен разработчиками Ардуино. Суть метода в том, что при каждом повторении цикла loop() мы проверяем, настало ли время мигать светодиодом (выполнять фоновую задачу) или нет. И если настало, то инвертируем состояние светодиода. Это своеобразный вариант обхода оператора delay().

Существенным недостатком данного метода является то, что участок кода перед блоком управления светодиодом должен выполняться быстрее, чем интервал времени мигания светодиода «ledInterval». В противном случае мигание будет происходить реже, чем нужно, и эффекта параллельного выполнения задач мы не получим. В частности, в нашем скетче длительность изменения звука сирены составляет 200+200+200+200 = 800 мсек, а интервал мигания светодиодом мы задали 200 мсек. Но светодиод будет мигать с периодом 800 мсек, что в 4 раза больше того, что мы задали.

Вообще, если в коде используется оператор delay(), в таком случае трудно сымитировать псевдо-параллельность, поэтому желательно его избегать.

В данном случае нужно было бы для блока управления звуком сирены также проверять, пришло время или нет, а не использовать delay(). Но это бы увеличило количество кода и ухудшило читаемость программы.

4 Использование библиотеки ArduinoThreadдля создания параллельных потоков

Чтобы решить поставленную задачу, воспользуемся замечательной библиотекой ArduinoThread, которая позволяет с лёгкостью создавать псевдо-параллельные процессы. Она работает похожим образом, но позволяет не писать код по проверке времени – нужно выполнять задачу в этом цикле или не нужно. Благодаря этому сокращается объём кода и улучшается читаемость скетча. Давайте проверим библиотеку в действии.

Первым делом скачаем с официального сайта архив библиотеки и разархивируем его в директорию libraries/ среды разработки Arduino IDE. Затем переименуем папку ArduinoThread-master в ArduinoThread.

Схема подключений останется прежней. Изменится лишь код программы.

В программе мы создаём два потока – ledThread и soundThread, каждый выполняет свою операцию: один мигает светодиодом, второй управляет звуком сирены. В каждой итерации цикла для каждого потока проверяем, пришло ли время его выполнения или нет. Если пришло – он запускается на исполнение с помощью метода run(). Главное – не использовать оператор delay(). В коде даны более подробные пояснения.

Параллельное выполнение потоков на Arduino

Загрузим код в память Ардуино, запустим. Теперь всё работает в точности так, как надо!

Использование потоков и работа с потоками Using threads and threading

С помощью языка .NET можно создавать приложения, которые выполняют несколько операций одновременно. With .NET, you can write applications that perform multiple operations at the same time. Операции, которые потенциально могут задержать выполнение других операций, выполняются в отдельных потоках; такой способ организации работы приложения называется многопоточностью или свободным созданием потоков. Operations with the potential of holding up other operations can execute on separate threads, a process known as multithreading or free threading.

Приложения, использующие многопоточность, более оперативно реагируют на действия пользователя, поскольку пользовательский интерфейс остается активным, в то время как задачи, требующие интенсивной работы процессора, выполняются в других потоках. Applications that use multithreading are more responsive to user input because the user interface stays active as processor-intensive tasks execute on separate threads. Многопоточность также эффективна при создании масштабируемых приложений, поскольку пользователь может добавлять потоки при увеличении рабочей нагрузки. Multithreading is also useful when you create scalable applications, because you can add threads as the workload increases.

Если требуется больший контроль над поведением потоков приложения, можно управлять потоками самостоятельно. If you need more control over the behavior of the application’s threads, you can manage the threads yourself. Но начиная с .NET Framework 4 многопоточное программирование значительно упростилось благодаря классам System.Threading.Tasks.Parallel и System.Threading.Tasks.Task, Parallel LINQ (PLINQ), новым классам параллельных коллекций из пространства имен System.Collections.Concurrent и новой модели программирования, которая вместо потоков использует концепцию задач. However, starting with the .NET Framework 4, multithreaded programming is greatly simplified with the System.Threading.Tasks.Parallel and System.Threading.Tasks.Task classes, Parallel LINQ (PLINQ), new concurrent collection classes in the System.Collections.Concurrent namespace, and a new programming model that is based on the concept of tasks rather than threads. Дополнительные сведения см. в статье Параллельное программирование в .NET и Библиотека параллельных задач (TPL). For more information, see Parallel Programming and Task Parallel Library (TPL).

Практическое руководство. Создание и запуск нового потока How to: Create and start a new thread

Чтобы создать поток, создайте новый экземпляр класса System.Threading.Thread и укажите в конструкторе имя метода, который должен выполняться в новом потоке. You create a new thread by creating a new instance of the System.Threading.Thread class and providing the name of the method that you want to execute on a new thread to the constructor. Чтобы запустить созданный поток, вызовите метод Thread.Start. To start a created thread, call the Thread.Start method. Дополнительные сведения и примеры см. в статье Создание потоков и передача данных во время запуска и справочнике по API Thread. For more information and examples, see the Creating threads and passing data at start time article and the Thread API reference.

Практическое руководство. Остановка потока How to: Stop a thread

Чтобы прервать выполнение потока, используйте метод Thread.Abort. To terminate the execution of a thread, use the Thread.Abort method. Этот метод вызывает ThreadAbortException в потоке, для которого был вызван. That method raises a ThreadAbortException on the thread on which it’s invoked. Дополнительные сведения см. в разделе Уничтожение потоков. For more information, see Destroying threads.


Начиная с .NET Framework 4 вы можете использовать System.Threading.CancellationToken для совместной отмены потока. Beginning with the .NET Framework 4, you can use the System.Threading.CancellationToken to cancel a thread cooperatively. Подробные сведения см. в статье Отмена в управляемых потоках. For more information, see Cancellation in managed threads.

Используйте метод Thread.Join, чтобы вызывающий поток ждал завершения потока, для которого был вызван метод. Use the Thread.Join method to make the calling thread wait for the termination of the thread on which the method is invoked.

Практическое руководство. Приостановка или прерывание потока How to: Pause or interrupt a thread

Используйте метод Thread.Sleep, чтобы приостановить текущий поток на определенное время. You use the Thread.Sleep method to pause the current thread for a specified amount of time. Заблокированный поток можно прервать путем вызова метода Thread.Interrupt. You can interrupt a blocked thread by calling the Thread.Interrupt method. Дополнительные сведения см. в разделе Приостановка и прерывание потоков. For more information, see Pausing and interrupting threads.

Свойства потока Thread properties

В приведенной ниже таблице показаны некоторые свойства Thread. The following table presents some of the Thread properties:

Многопоточное программирование в Java 8. Часть первая. Параллельное выполнение кода с помощью потоков

Добро пожаловать в первую часть руководства по параллельному программированию в Java 8. В этой части мы на простых примерах рассмотрим, как выполнять код параллельно с помощью потоков, задач и сервисов исполнителей.

Впервые Concurrency API был представлен вместе с выходом Java 5 и с тех пор постоянно развивался с каждой новой версией Java. Большую часть примеров можно реализовать на более старых версиях, однако в этой статье я собираюсь использовать лямбда-выражения. Если вы все еще не знакомы с нововведениями Java 8, рекомендую посмотреть мое руководство.

Потоки и задачи

Все современные операционные системы поддерживают параллельное выполнение кода с помощью процессов и потоков. Процесс — это экземпляр программы, который запускается независимо от остальных. Например, когда вы запускаете программу на Java, ОС создает новый процесс, который работает параллельно другим. Внутри процессов мы можем использовать потоки, тем самым выжав из процессора максимум возможностей.

Потоки (threads) в Java поддерживаются начиная с JDK 1.0. Прежде чем запустить поток, ему надо предоставить участок кода, который обычно называется «задачей» (task). Это делается через реализацию интерфейса Runnable , у которого есть только один метод без аргументов, возвращающий void — run() . Вот пример того, как это работает:

Поскольку интерфейс Runnable функциональный, мы можем использовать лямбда-выражения, которые появились в Java 8. В примере мы создаем задачу, которая выводит имя текущего потока на консоль, и запускаем ее сначала в главном потоке, а затем — в отдельном.

Результат выполнения этого кода может выглядеть так:

Из-за параллельного выполнения мы не можем сказать, будет наш поток запущен до или после вывода «Done!» на экран. Эта особенность делает параллельное программирование сложной задачей в больших приложениях.

15–16 ноября, Минск, 133–390 br

Потоки могут быть приостановлены на некоторое время. Это весьма полезно, если мы хотим сэмулировать долго выполняющуюся задачу. Например, так:

Когда вы запустите этот код, вы увидите секундную задержку между выводом первой и второй строки на экран. TimeUnit — полезный класс для работы с единицами времени, но то же самое можно сделать с помощью Thread.sleep(1000) .

Работать с потоками напрямую неудобно и чревато ошибками. Поэтому в 2004 году в Java 5 добавили Concurrency API. Он находится в пакете java.util.concurrent и содержит большое количество полезных классов и методов для многопоточного программирования. С тех пор Concurrency API непрерывно развивался и развивается.

Давайте теперь подробнее рассмотрим одну из самых важных частей Concurrency API — сервис исполнителей (executor services).

Исполнители

Concurrency API вводит понятие сервиса-исполнителя (ExecutorService) — высокоуровневую замену работе с потоками напрямую. Исполнители выполняют задачи асинхронно и обычно используют пул потоков, так что нам не надо создавать их вручную. Все потоки из пула будут использованы повторно после выполнения задачи, а значит, мы можем создать в приложении столько задач, сколько хотим, используя один исполнитель.

Вот как будет выглядеть наш первый пример с использованием исполнителя:

Класс Executors предоставляет удобные методы-фабрики для создания различных сервисов исполнителей. В данном случае мы использовали исполнитель с одним потоком.

Результат выглядит так же, как в прошлый раз. Но у этого кода есть важное отличие — он никогда не остановится. Работу исполнителей надо завершать явно. Для этого в интерфейсе ExecutorService есть два метода: shutdown() , который ждет завершения запущенных задач, и shutdownNow() , который останавливает исполнитель немедленно.

Вот как я предпочитаю останавливать исполнителей:

Исполнитель пытается завершить работу, ожидая завершения запущенных задач в течение определенного времени (5 секунд). По истечении этого времени он останавливается, прерывая все незавершенные задачи.

Callable и Future

Кроме Runnable , исполнители могут принимать другой вид задач, который называется Callable . Callable — это также функциональный интерфейс, но, в отличие от Runnable , он может возвращать значение.

Давайте напишем задачу, которая возвращает целое число после секундной паузы:

Callable-задачи также могут быть переданы исполнителям. Но как тогда получить результат, который они возвращают? Поскольку метод submit() не ждет завершения задачи, исполнитель не может вернуть результат задачи напрямую. Вместо этого исполнитель возвращает специальный объект Future, у которого мы сможем запросить результат задачи.

После отправки задачи исполнителю мы сначала проверяем, завершено ли ее выполнение, с помощью метода isDone() . Поскольку задача имеет задержку в одну секунду, прежде чем вернуть число, я более чем уверен, что она еще не завершена.

Вызов метода get() блокирует поток и ждет завершения задачи, а затем возвращает результат ее выполнения. Теперь future.isDone() вернет true , и мы увидим на консоли следующее:

Задачи жестко связаны с сервисом исполнителей, и, если вы его остановите, попытка получить результат задачи выбросит исключение:

Вы, возможно, заметили, что на этот раз мы создаем сервис немного по-другому: с помощью метода newFixedThreadPool(1) , который вернет исполнителя с пулом в один поток. Это эквивалентно вызову метода newSingleThreadExecutor() , однако мы можем изменить количество потоков в пуле.

Таймауты

Любой вызов метода future.get() блокирует поток до тех пор, пока задача не будет завершена. В наихудшем случае выполнение задачи не завершится никогда, блокируя ваше приложение. Избежать этого можно, передав таймаут:

Выполнение этого кода вызовет TimeoutException :

Вы уже, возможно, догадались, почему было выброшено это исключение: мы указали максимальное время ожидания выполнения задачи в одну секунду, в то время как ее выполнение занимает две.

InvokeAll

Исполнители могут принимать список задач на выполнение с помощью метода invokeAll() , который принимает коллекцию callable-задач и возвращает список из Future .

В этом примере мы использовали функциональные потоки Java 8 для обработки задач, возвращенных методом invokeAll . Мы прошлись по всем задачам и вывели их результат на консоль. Если вы не знакомы с потоками (streams) Java 8, смотрите мое руководство.

InvokeAny

Другой способ отдать на выполнение несколько задач — метод invokeAny() . Он работает немного по-другому: вместо возврата Future он блокирует поток до того, как завершится хоть одна задача, и возвращает ее результат.

Чтобы показать, как работает этот метод, создадим метод, эмулирующий поведение различных задач. Он будет возвращать Callable , который вернет указанную строку после необходимой задержки:

Используем этот метод, чтобы создать несколько задач с разными строками и задержками от одной до трех секунд. Отправка этих задач исполнителю через метод invokeAny() вернет результат задачи с наименьшей задержкой. В данном случае это «task2»:

В примере выше использован еще один вид исполнителей, который создается с помощью метода newWorkStealingPool() . Этот метод появился в Java 8 и ведет себя не так, как другие: вместо использования фиксированного количества потоков он создает ForkJoinPool с определенным параллелизмом (parallelism size), по умолчанию равным количеству ядер машины.

ForkJoinPool впервые появился в Java 7, и мы рассмотрим его подробнее в следующих частях нашего руководства. А теперь давайте посмотрим на исполнители с планировщиком (scheduled executors).

Исполнители с планировщиком

Мы уже знаем, как отдать задачу исполнителю и получить ее результат. Для того, чтобы периодически запускать задачу, мы можем использовать пул потоков с планировщиком.

ScheduledExecutorService способен запускать задачи один или несколько раз с заданным интервалом.

Этот пример показывает, как заставить исполнитель выполнить задачу через три секунды:

Когда мы передаем задачу планировщику, он возвращает особый тип Future — ScheduledFuture , который предоставляет метод getDelay() для получения оставшегося до запуска времени.

У исполнителя с планировщиком есть два метода для установки задач: scheduleAtFixedRate() и scheduleWithFixedDelay() . Первый устанавливает задачи с определенным интервалом, например, в одну секунду:

Кроме того, он принимает начальную задержку, которая определяет время до первого запуска.

Обратите внимание, что метод scheduleAtFixedRate() не берет в расчет время выполнения задачи. Так, если вы поставите задачу, которая выполняется две секунды, с интервалом в одну, пул потоков рано или поздно переполнится.

В этом случае необходимо использовать метод scheduleWithFixedDelay() . Он работает примерно так же, как и предыдущий, но указанный интервал будет отсчитываться от времени завершения предыдущей задачи.

В этом примере мы ставим задачу с задержкой в одну секунду между окончанием выполнения задачи и началом следующей. Начальной задержки нет, и каждая задача выполняется две секунды. Так, задачи будут запускаться на 0, 3, 6, 9 и т. д. секунде. Как видите, метод scheduleWithFixedDelay() весьма полезен, если мы не можем заранее сказать, сколько будет выполняться задача.

Это была первая часть серии статей про многопоточное программирование. Настоятельно рекомендую разобрать вышеприведенные примеры самостоятельно. Все они доступны на GitHub. Можете смело форкать репозиторий и добавлять его в избранное.

Надеюсь, вам понравилась статья. Если у вас возникли какие-либо вопросы, вы можете задать их в твиттере.

20 типичных ошибок многопоточности в C++

Многопоточное программирование одна из самых сложных тем в программировании, особенно в C++. Трудно избежать при этом ошибок. К счастью большую часть удаётся отловить на этапе проверки кода или тестирования. Но особо коварные проникают в рабочие системы и исправлять их достаточно затруднительно.

В этой статье собраны и переведены самые значимые по мнению автора заметки ошибочные ситуации. Если у вас есть свои любимые ошибки или варианты их решения, оставьте, пожалуйста, их в комментариях.

Все примеры успешно компилируются и исполняются в Ubuntu 16.04 LTS:

#1 Отсутствие join() или detach() перед завершением

Если забыть вызвать join() или detach() перед завершением программы, это может привести к её аварийному завершению.

В конце функции main() объект t выходит из области видимости и вызывается деструктор. Внутри деструктора выполняется проверка на подключаемость потока. Подключаемый поток — это поток который может или уже выполняется. В данном случае это именно так поэтому будет вызвана функция std::terminate() .

В зависимости желаемого поведения следует либо подождать завершения потока:

либо разорвать с ним связь

#2 Попытка дождаться завершения неподключаемого потока

Для объектов std::thread которые были перемещены, завершены join() или брошены detach() нельзя дождаться завершения.

В таких случаях следует проверять, а можно ли в принципе подключить поток, и только потом уже вызывать join() .

#3 Вызов join() блокирует вызывающий поток

В реальных приложениях потоки могут обслуживать достаточно длительные операции связанные с сетевым вводом/выводом или реакцией пользователя в пользовательском интерфейсе. Попытка дождаться завершения таких потоков в основном потоке или в потоке отвечающим за интерфейс может привести к «заморозке». Лучше найти другое решение.


Например, для десктопного приложения вспомогательный поток перед своим завершением может послать сообщение в поток отвечающий за интерфейс. Последний как правило организован как обработчик очереди сообщений. Чаще всего это сообщения от элементов интерфейса, но могут быть и нажатия клавиш и даже перемещения мыши. В этом потоке точно так же можно получить сообщение от вспомогательного потока и среагировать на завершение, не дожидаясь его буквально, и не блокируя ожиданием обработку событий.

#4 Не учитывать особенности передачи аргументов в поток

Аргументы в функцию потока перемещаются или копируются по значению. Пример ниже даже не скомпилируется.

Для успешной компиляции ссылка должна быть передана через std::ref

#5 Игнорировать общий доступ к ресурсам

В условиях многопоточности несколько потоков могут одновременно использовать общие ресурсы или данные. Это может приводить не только к труднопрогнозируемому поведению но и к краху программы. В таких случаях необходимо упорядочивать и контролировать доступ.

В качестве примера рассмотрим вывод в консоль в несколько потоков — основной и шесть дополнительных.

В результате получим мешанину из слов:

Дело в том, что консоль одна, а семь потоков пытаются выводить на неё одновременно. Чтобы сделать вывод более предсказуемым необходимо ограничить одновременный доступ. Сделаем это через std::mutex — заблокируем до вывода и освободим после.

Вывод получится читаемый:

Мешанины из фрагментов строк уже нет. Порядок захвата нами не управляется, поэтому сами сообщения появляются в произвольном порядке.

#6 Забыть вызвать unlock()

В предыдущем примере для разделения доступа к ресурсу использовался std::mutex . Это не самый удачный способ, поскольку вызова unlock() мы можем и не достичь, если вообще не забыли его вызвать.

После появления первого же сообщения программа зависнет:

Чтобы защититься от ошибок такого рода воспользуемся std::lock_guard , который манипулирует временем жизни блокировки в стиле RAII.

В конструкторе захватывает, в деструкторе освобождает. По какой бы причине мы не покинули область видимости — блокировка будет снята.

#7 Пренебрежение размером защищённой секции

Пока мы находимся внутри защищённой секции все остальные потоки рвущиеся её выполнить заблокированы. Старайтесь делать заблокированный участок как можно меньше.

Безопасный вариант программы будет исполняться почти две секунды. Если нам необходимо защитить только вывод в консоль, то нет необходимости в секции такого размера. Мы можем переместить блокировки ближе к выводу.

Такой вариант будет выполняться уже около одной секунды и при этом останется безопасным.

#8 Взаимные блокировки

Как правило такие блокировки уже навсегда из-за чего получили название deadlock. Типичная ситуация такой блокировки представлена ниже. Функция sleep_for даёт нам 100% шанс попасть в вечную блокировку, без неё скорее всего на любой машине этот код выполнился бы без зависания.

Причина зависания кроется в перекрёстной блокировке. Когда оба потока начинают работать каждый из них блокирует свой mutex. То есть t1 захватил cerr_mutex, а t2 — cout_mutex. После вызова sleep_for потоки пытаются захватить их наоборот, еще не освободив занятые. Для того чтобы освободить свой mutex потоку приходится ждать пока это сделает второй, а у второго ситуация ровно такая же.

Самое простое решение использовать std::lock для захвата обоих mutex.

Если условия позволяют, можно использовать std::timed_mutex . Для выхода из перекрёстной блокировки достаточно, чтобы одну из них можно было нарушить.

С одной стороны мы успешно избежали блокировки, но с другой стороны нам пришлось взять на себя обработку этой ситуации.

#9 Повторный захват std::mutex

Эта некоторая вариация на тему перекрёстного захвата с тем лишь отличием, что для такой блокировки достаточно одного std::mutex. Даже дополнительные потоки не нужны. Тут можно было бы привести в качестве примера страшную банальщину типа:

Да, это именно такого рода ошибка, вот только в жизни она встречается в несколько более изощрённой форме. В примере ниже нет ни потоков, ни рекурсии и всего один mutex.

Вызов foo приводит к захвату mutex, но в процессе вызова bar возникает необходимость блокировки ресурса повторно. Это такой же deadlock, только теперь основной поток ждёт сам себя.

Существует очевидный способ решить проблему — заменить обычный std::mutex на рекурсивный std::recursive_mutex и это решит нашу проблему, но решит её ой каким опасным способом. Тысячу раз подумайте всё ли будет в порядке при таком подходе, не удастся ли найти решение более элегантное

#10 Излишняя предосторожность

Когда возникает необходимость модифицировать простые типы наподобие bool или int использование ‘std::atomic’ почти всегда более эффективно в сравнении с использованием mutex.

Та же самая логика без использования mutex и использованием atomic.

#11 Частое создание потоков без использования пулов

Создание и удаление потоков дорогое удовольствия с точки зрения затрат CPU. Часто создавать и удалять потоки в приложении, которое само по активно занимается вычислениями значит мешать ему. Вместо того, чтобы часто создавать и удалять потоки лучше создать пул предварительно запущенных потоков и распределять между ними задания.

Использование пула потоков позволяет сократить объём кода и количество потенциальных ошибок связанных с запуском и остановкой потоков. Позволит избежать избыточного количества потоков, которое может негативно сказаться на производительности.

Существуют готовые реализации такого рода пулов. Например, TBB

#12 Не обработанные исключения в потоке

Исключения брошенные в одном потоке не могут быть перехвачены другим. Представим себе функцию, которая может бросить исключение. Если мы выполним эту функцию в отдельном потоке, то попытка поймать исключение в основном не сработает.

Программа аварийно завершится так и не вызвав обработчик исключения. Решением может быть перехват исключения в потоке и передача информации о нём через экземпляр std::exception_ptr в родительский поток и повторного бросания уже за пределами породившего исключения потока.

#13 Имитация асинхронной работы без std::async

Когда нужно выполнить часть кода независимо от основного потока отличным выбором будет использование std::async для запуска. Это тоже самое, что создать ещё один поток и передать ему на выполнение функцию или лямбду. Правда при этом за жизненным циклом потока и исключений в нём тоже придётся следить самостоятельно. В случае использования std::async можно не заботиться об этом да ещё и существенно сократить вероятность блокировки.

Еще одно важно преимущество заключается в возможности получить результат работы функции через std::future . Функция int foo() , будучи выполнена как асинхронная задача, заранее установит результат своей работы. А получим мы его тогда, когда нам это будет удобно.

Использование потоков напрямую делает получение результатов чуть более громоздким. Это может быть:

Передача ссылки на результирующую переменную в поток, в котором необходимо сохранить результат.

Сохранение результата в переменной класса функционального объекта и чтение его после завершения работы.

Курт Гантерот в своей книге утверждает, что создание потоков в 14 раз дороже использования std::async .

Короче говоря, пока не доказано обратное использовать следует std::async .

#14 Опускание std::launch::async когда это действительно необходимо

Название std::async может ввести в заблуждение, потому что функция, которая будет передана для запуска по умолчанию может и не запуститься отдельно от вызываемого потока!

Существует два способа запуска:

  1. std::launch::async . Задача будет немедленно запущена в отдельном потоке.
  2. std::launch::deferred . Выполнение задачи будет отложено до вызова .get() или .wait() возвращаемого объекта std::future . При этом выполнение осуществляется синхронно!

Без явного указания способа запуска предполагается комбинация этих вариантов и фактически предсказать как именно будет запущена задача невозможно. Существуют связанные с этим сложности, например невозможно предсказать корректно ли будет обращение к переменным потока, невозможно предсказать будет ли выполнена функция вообще, если до выполнения функций .get() или .wait() дело так и не дошло ну и цикл ожидания готовности future никогда не закончится для отложенного сценария, ждать бесполезно.

Чтобы избежать недоразумений явно указывайте std::launch::async при запуске std::async .

Гарантировано в другом потоке:

#15 Использование .get() может привести к ожиданию

Несмотря на то, что лямбда явно будет запущена в отдельном потоке вызов метода .get() может привести к нежелательному ожиданию. Более того, на следующей итерации код вообще аварийно завершится, потому что первый результат уже был выведен на консоль а никакого другого во future нет.

Обе проблемы можно решить проверив future на готовность.

#16 Исключение из задачи перетечёт во future

Исключение брошенное в асинхронной задаче должно быть обработано так, будто оно возникло в вызываемом потоке. В данном случае пример себя поведёт так, будто исключение никто не обработал.

Программа аварийно завершится. Если в задаче было брошено исключение, оно распространится и на вызов .get(). Если до конца жизни future .get() так и не будет вызван исключение просто проигнорируется.

Для таких задач имеет смысл использование обычной конструкции try/catch.

#17 Использование std::async там, где нужен тонкий контроль потоков

В большинстве случаев достаточно использования std::async , кроме ситуаций когда возникает необходимость в полном контроле над исполняемым потоком.

Например изменить параметры для планировщика:

Это возможно благодаря наличию метода .native_handle() у std::thread , значение которого можно использовать в POSIX системах. Использование этого метода полезно всегда, когда не хватает функциональности ни std::async ни std::thread . Использование std::async скрывает детали реализации и непригодно для такой тонкой работы.

#18 Пренебрежение анализом нагрузки на CPU

В любой момент времени потоки можно разделить на две группы — те которые что-то делают и те, который спят.

Потоки которые что-то делают занимают ядра процессора на которые их отправил планировщик. Для потоков которые занимаются активными вычислениями важно иметь в своём распоряжении свободные ядра, в противном случае большое количество потоков не даст никакого прироста в производительности. Даже скорее наоборот снизит производительность за счёт дополнительных переключений контекста потока.

Потоки которые преимущественно находятся в режиме ожидания в таком внимании процессора не нуждаются и могут находится в системе в гораздо большем количестве. И в случае с вводом/выводом даже помогают увеличить пропускную способность.

Я рассмотрел два крайних варианта, но наш конечно же будет посередине. Да, есть метод std::thread::hardware_concurrency() , которая сообщит нам сколько ядер доступно планировщику с учётом физических и логических.

Но это не помогает ответить правильно на вопрос — сколько же потоков можно запустить одновременно? Число ядер помогает понять сколько одновременно потоков, которые непрерывно длительное время активно потребляют процессор.


Если сценарий использования вычислений именно такой,то количество потоков должно быть как можно ближе к количеству ядер, а то и меньше, чтобы исключить распределение на логических ядрах.

Если потоки преимущественно заблокированы мьютексами или вводом/выводом, то ограничивать количество потоков ради экономии процессора не имеет большого смысла.

В остальных случаях необходимо нагрузочное тестирование и мониторинг с регулировкой количества потоков. Иными словами подбирается экспериментально.

#19 Использование квалификатора volatile для синхронизации

Использование этого квалификатора указание компилятору того, что изменения объекта могут происходить без контроля на этапе компиляции. Не в том смысле, что они происходят из разных потоков, а в том, что вообще за пределами кода, грубо говоря самопроизвольно. Это очень низкоуровневое указание и это никак не помогает в одновременном доступе внутри процесса.

Для синхронизации следует использовать atomic , mutex , и condition_variable .

#20 Неоправданное использование lock-free алгоритмов

Программирование без необходимости блокировок звучит очень привлекательно в сравнении с обычными механизмами синхронизации. Возможно, в случае жёсткого ограничения вычислительных ресурсов применение подобных алгоритмов может быть оправдано. В остальных случаях выглядит скорее преждевременной оптимизацией, которая, к тому же, может обернуться сложными ошибками в самое неподходящее время (и без coredump тут не обойтись).

Прежде чем приступить к использованию свободных от блокировок алгоритмов следует подумать над тремя вопросами:

  1. Пробовали ли спроектировать код без необходимости синхронизации?
  2. Выполняли ли анализ производительности, поиск и оптимизацию узких мест?
  3. Можно ли ограничиться горизонтальным масштабированием?

Использование lock-free алгоритмов оправдано, когда никаких других решений просто не осталось.

Надеюсь, чтение этого материала было так же полезно, как его перевод и публикация.

Параллельные потоки и управление?

У меня есть такая функция:

Чтобы свести к минимуму время, затрачиваемое на получение всех необходимых данных, я хотел бы одновременно выполнять функцию 4 раза с разными диапазонами, а затем объединять результаты или использовать список безопасных потоков или словарь.

Мой вопрос в том, как я мог запустить эту функцию в 4 отдельных потоках и все еще иметь возможность контролировать, если один из потоков по-прежнему работает или не знает, когда он заканчивается?

Моя первая идея заключалась в объявлении каждого потока:

Затем я запустил бы контроллер и изнутри контроллера я бы назвал каждый поток, чтобы выполнить вышеописанную функцию, и отслеживать каждый поток от контроллера, что-то вроде:

Размышляя над всем этим беспорядком выше, я понимаю, что это не лучший способ сделать то, что я хотел, и вот я.).

Мастер Йода рекомендует:  PHP для чайников

Как я могу управлять более чем 1 потоком для выполнения одной и той же функции и все же иметь возможность знать, когда каждый поток закончил работу, чтобы закрыть или сохранить или объединить данные или сделать все, что осталось от основного потока?

Threading — управление параллельными потоками

Разбиение C++ программ на множество потоков

Непрерывное усложнение компьютерных систем вселяет в нас надежду, что мы и в дальнейшем сможем успешно управлять этим видом абстракции.

(— Эндрю Кёниг и Барбара My(Andrew Koening and Barbara Moo), Ruminations on С++ )

Работу любой последовательной программы можно разделить между несколькими подпрограммами. Каждой подпрограмме назначается конкретная задача, и все эти задачи выполняются одна за другой. Вторая задача не может начаться до тех пор, пока не завершится первая, а третья — пока не закончится вторая и т.д. Описанная схема прекрасно работает до тех пор, пока не будут достигнуты границы производительности и сложности. В одних случаях единственное решение проблемы производительности — найти возможность выполнять одновременно более одной задачи. В других ситуациях работа подпрограмм в программе настолько сложна, что имеет смысл представить эти подпрограммы в виде мини-программ, которые выполняются параллельно внутри основной программы. В главе 3 были представлены методы разбиения одной программы на несколько процессов, каждый из которых выполняет отдельную задачу. Такие методы позволяют приложению в каждый момент времени выполнять сразу несколько действий. Однако в этом случае каждый процесс имеет собственные адресное пространство и ресурсы. Поскольку каждый процесс занимает отдельное адресное пространство, то взаимодействие между процессами превращается в настоящую проблему. Для обеспечения связи между раздельно выполняемыми частями общей программы нужно реализовать такие средства межпроцессного взаимодействия, как каналы, FIFO-очереди (с дисциплиной обслуживания по принципу «первым пришел — первым обслужен») и переменные среды. Иногда нужно иметь одну программу (которая выполняет несколько задач одновременно), не разбивая ее на множество мини-программ. В таких обстоятельствах можно использовать потоки. Потоки позволяют одной программе состоять из параллельно выполняемых частей, причем все части имеют доступ к одним и тем же переменным, константам и адресному пространству в целом. Потоки можно рассматривать как мини-программы в основной программе. Если программа разделена на несколько процессов, как было показано в главе 3 , то с выполнением каждого отдельного процесса связаны определенные затраты системных ресурсов. Для потоков требуется меньший объем затрат системных ресурсов. Поэтому потоки можно рассматривать как облегченные процессы, т.е. они позволяют воспользоваться многими преимуществами процессов без больших затрат на организацию взаимодействия между ними. Потоки обеспечивают средства разделения основного «русла» программы на несколько параллельно выполняемых «ручейков».

Под потоком подразумевается часть выполняемого кода в UNIX- или Linux-процессе, которая может быть регламентирована определенным образом. Затраты вычислительных ресурсов, связанные с созданием потока, его поддержкой и управлением, у операционной системы значительно ниже по сравнению с аналогичными затратами для процессов, поскольку объем информации отдельного потока гораздо меньше, чем у процесса. Каждый процесс имеет основной, или первичный, поток. Под основным потоком процесса понимается программный поток управления или поток выполнения. Процесс может иметь несколько потоков выполнения и, соответственно, столько же потоков управления. Каждый поток, имея собственную последовательность инструкций, выполняется независимо от других, а все они — параллельно друг другу. Процесс с несколькими потоками, называется многопоточным. Многопоточный процесс, состоящий из нескольких потоков, показан на рис. 4.1.

Рис. 4.1. Потоки выполнения многопоточного процесса

Контекстные требования потока

Все потоки одного процесса существуют в одном и том же адресном пространстве. Все ресурсы, принадлежащие процессу, разделяются между потоками. Потоки не владеют никакими ресурсами. Ресурсы, которыми владеет процесс, совместно используются всеми потоками этого процесса. Потоки разделяют дескрипторы файлов и файловые указатели, но каждый поток имеет собственные программный указатель, набор регистров, состояние и стек. Все стеки потоков находятся в стековом разделе своего процесса. Раздел данных процесса совместно используется потоками процесса. Поток может считывать (и записывать) информацию из области памяти своего процесса. Когда основной поток записывает данные в память, то любые сыновние потоки могут получить к ним доступ. Потоки могут создавать другие потоки в пределах того же процесса. Все потоки в одном процессе считаются равноправными. Потоки также могут приостановить, возобновить или завершить другие потоки в своем процессе.

Потоки — это выполняемые части программы, которые соревнуются за использование процессора с потоками того же самого или других процессов. В многопроцессорной системе потоки одного процесса могут выполняться одновременно на различных процессорах. Однако потоки конкретного процесса выполняются только на процессоре, который назначен этому процессу. Если, например, процессоры 1, 2 и 3 назначены процессу А, а процесс А имеет три потока, то любой из них может быть назначен любому процессору. В среде с одним процессором потоки конкурируют за его использование. Параллельность же достигается за счет переключения контекста. Контекст переключается, если операционная система поддерживает многозадачность при наличии единственного процессора. Многозадачность позволяет на одном процессоре одновременно выполнять несколько задач. Каждая задача выполняется в течение выделенного интервала времени. По истечении заданного интервала или после наступления некоторого события текущая задача снимается с процессора, а ему назначается другая задача. Когда потоки выполняются параллельно в одном процессе, то о таком процессе говорят, что он — многопоточный. Каждый поток выполняет свою подзадачу таким образом, что подзадачи процесса могут выполняться независимо от основного потока управления процесса. При многозадачности потоки могут конкурировать за использование одного процессора или назначаться другим процессорам. Но в любом случае переключение контекста между потоками одного и того же процесса требует меньше ресурсов, чем переключение контекста между потоками различных процессов. Процесс использует много системных ресурсов для отслеживания соответствующей информации, а на управление этой информацией при переключении контекста между процессами требуется значительное время. Большая часть информации, содержащейся в контексте процесса, описывает адресное пространство процесса и ресурсы, которыми он владеет. Переключаясь между потоками, определенными в различных адресных пространствах, контекст переключается и между процессами. Поскольку потоки в рамках одного процесса не имеют собственного адресного пространства (или ресурсов), то операционной системе приходится отслеживать меньший объем информации. Контекст потока состоит только из идентификационного номера (id), стека, набора регистров и приоритета. В регистрах содержится программный указатель и указатель стека. Текст (программный код) потока содержится в текстовом разделе соответствующего процесса. Поэтому переключение контекста между потоками одного процесса займет меньше времени и потребует меньшего объема системных ресурсов.

Сравнение потоков и процессов

У потоков и процессов есть много общего. Они имеют идентификационный номер (id), состояние, набор регистров, приоритет и привязку к определенной стратегии планирования. Подобно процессам, потоки имеют атрибуты, которые описывают их для операционной системы. Эта информация содержится в информационном блоке потока, подобном информационному блоку процесса. Потоки и сыновние процессы разделяют ресурсы родительского процесса. Ресурсы, открытые родительским процессом (в его основном потоке), немедленно становятся доступными всем потокам и сыновним процессам. При этом никакой дополнительной инициализации или подготовки не требуется. Потоки и сыновние процессы независимы от родителя (создателя) и конкурируют за использование процессора. Создатель процесса или потока управляет своим потомком, т.е. он может отменить, приостановить или возобновить его выполнение либо изменить его приоритет. Поток или процесс может изменить свои атрибуты и создать новые ресурсы, но не может получить доступ к ресурсам, принадлежащим другим процессам. Однако между потоками и процессами есть множество различий.

Различия между потоками и процессами

Основное различие между потоками и процессами состоит в том, что каждый процесс имеет собственное адресное пространство, а потоки — нет. Если процесс создает множество потоков, то все они будут содержаться в его адресном пространстве. Вот почему они так легко разделяют общие ресурсы, и так просто обеспечивается взаимодействие между ними. Сыновние процессы имеют собственные адресные пространства и копии разделов данных. Следовательно, когда процесс-потомок изменяет свои переменные или данные, это не влияет на данные родительского процесса. Если необходимо, чтобы родительский и сыновний процессы совместно использовали данные, нужно создать общую область памяти. Для передачи данных между родителем и потомком используются такие механизмы межпроцессного взаимодействия, как каналы и FIFO-очереди. Потоки одного процесса могут передавать информацию и связываться друг с другом путем непосредственного считывания и записи общих данных, которые доступны родительскому процессу.

Потоки, управляющие другими потоками

В то время как процессы могут управлять другими процессами, если между ними установлены отношения типа «родитель-потомок», потоки одного процесса считаются равноправными и находятся на одном уровне, независимо от того, кто кого создал. Любой поток, имеющий доступ к идентификационному номеру (id) некоторого другого потока, может отменить, приостановить, возобновить выполнение э того потока либо изменить его приоритет. Отмена основного потока приведет к завершению всех потоков процесса, т.е. к ликвидации процесса. Любые изменения, внесенные в основной поток, могут повлиять на все потоки процесса. При изменении приоритета процесса все его потоки, которые унаследовали этот приоритет, должны также изменить свои приоритеты. Сходства и различия между потоками и процессами сведены в табл. 4.1.

Таблица 4.1. Сходства и различия между потоками и процессами

• Оба имеют идентификационный номер (id), состояние, набор регистров, приоритет и привязку

к определенной стратегии планирования

• И поток, и процесс имеют атрибуты, которые описывают их особенности для операционной системы

• Как поток, так и процесс имеют информационные блоки

• Оба разделяют ресурсы с родительским процессом

• Оба функционируют независимо от родительского процесса

• Их создатель может управлять потоком или процессом

• И поток, и процесс могут изменять свои атрибуты

• Оба могут создавать новые ресурсы

• Как поток, так и процесс не имеют доступа к ресурсам другого процесса

• Потоки разделяют адресное пространство процесса, который их создал; процессы имеют собственное адресное пространство

• Потоки имеют прямой доступ к разделу данных своего процесса; процессы имеют собственную копию раздела данных родительского процесса

• Потоки могут напрямую взаимодействовать

с другими потоками своего процесса; процессы должны использовать специальный механизм межпроцессного взаимодействия для связи с «братскими» процессами

• Потоки почти не требуют системных затратна поддержку процессов требуются значительные затраты системных ресурсов

• Новые потоки создаются легко; новые процессы требуют дублирования родительского процесса

• Потоки могут в значительной степени управлять потоками того же процесса; процессы управляют только сыновними процессами

• Изменения, вносимые в основной поток (отмена, изменение приоритета и т.д.), могут влиять на поведение других потоков процесса; изменения, вносимые в родительский процесс, не влияют на сыновние процессы

Преимущества использования потоков

При управлении подзадачами приложения использование потоков имеет ряд преимуществ.

• Для переключения контекста требуется меньше системных ресурсов.

• Достигается более высокая производительность приложения.

• Для обеспечения взаимодействия между задачами не требуется никакого специального механизма.

• Программа имеет более простую структуру.

Переключение контекста при низкой (ограниченной) доступности процессора

При организации процесса для выполнения возложенной на него функции может оказаться вполне достаточно одного основного потока. Если же процесс имеет множество параллельных подзадач, то их асинхронное выполнение можно обеспечить с помощью нескольких потоков, на переключение контекста которых потребуются незначительные затраты системных ресурсов. При ограниченной доступности процессора или при наличии в системе только одного процессора параллельное выполнение процессов потребует существенных затрат системных ресурсов в связи с необходимостью обеспечить переключение контекста. В некоторых ситуациях контекст процессов переключается только тогда, когда процессору последовательно назначаются потоки из разных процессов. Под системными затратами подразумеваются не только системные ресурсы, но и время, требуемое на переключение контекста. Но если система содержит достаточное количество процессоров, то переключение контекста не является проблемой.

Возможности повышения производительности приложения

Создание нескольких потоков повышает производительность приложения. При использовании одного потока запрос к устройствам ввода-вывода может остановить весь процесс. Если же в приложении организовано несколько потоков, то пока один из них будет ожидать удовлетворения запроса ввода-вывода, другие потоки, которые не зависят от заблокированного, смогут продолжать выполнение. Тот факт, что не все потоки ожидают завершения операции ввода-вывода, означает, что приложение в целом не заблокировано ожиданием, а продолжает работать.

Простая схема взаимодействия между параллельно выполняющимися потоками

Потоки не требуют специального механизма взаимодействия между подзадачами. Потоки могут напрямую передавать данные другим потокам и получать данные от них, что также способствует экономии системных ресурсов, которые при использовании нескольких процессов пришлось бы направлять на настройку и поддержку специальных механизмов взаимодействия. Потоки же используют общую память, выделяемую в адресном пространстве процесса. Процессы также могут взаимодействовать через общую память, но они имеют раздельные адресные пространства, и поэтому такая общая память должна быть вне адресных пространств обоих взаимодействующих процессов. Этот подход увеличит временные и пространственные расходы системы на поддержку и доступ к общей памяти. Схема взаимодействия между потоками и процессами показана на рис. 4 .2 .

Упрощение структуры программы

Потоки можно использовать, чтобы упростить структуру приложения. Каждому потоку назначается подзадача или подпрограмма, за выполнение которой он отвечает. Поток должен независимо управлять выполнением своей подзадачи. Каждому потоку можно присвоить приоритет, отражающий важность выполняемой им задачи Для приложения. Такой подход позволяет упростить поддержку программного кода.

Недостатки использования потоков

Простота доступности потоков к памяти процесса имеет свои недостатки.

• Потоки могут легко разрушить адресное пространство процесса.

• Потоки необходимо синхронизировать при параллельном доступе (для чтения или записи) к памяти.


• Один поток может ликвидировать целый процесс или программу.

• Потоки существуют только в рамках единого процесса и, следовательно, не являются многократно используемыми.

Рис. 4.2. Взаимодействие между потоками одного процесса и взаимодействие между несколькими процессами

Потоки могут легко разрушить адресное пространство процесса

Потоки могут легко разрушить информацию процесса во время «гонки» данных, если сразу несколько потоков получат доступ для записи одних и тех же данных. При использовании процессов это невозможно. Каждый процесс имеет собственные данные, и другие процессы не в состоянии получить к ним доступ, если не сделать это специально. Защита информации обусловлена наличием у процессов отдельных адресных пространств. Тот факт, что потоки совместно используют одно и то же адресное пространство, делает данные незащищенными от искажения. Например, процесс имеет три потока — А, В и С. Потоки А и В записывают информацию в некоторую область памяти, а поток С считывает из нее значение и использует его для вычислений. Потоки А и В могут попытаться одновременно записать информацию в эту область памяти. Поток В может перезаписать данные, записанные потоком А, еще до того, как поток С получит возможность считать их. Поведение этих потоков должно быть синхронизировано таким образом, чтобы поток С мог считать данные, записанные потоком А, до того, как поток В их перезапишет. Синхронизация защищает данные от перезаписи до их использования. Тема синхронизации потоков рассматривается в главе 5.

Один поток может ликвидировать целую программу

Поскольку потоки не имеют собственного адресного пространства, они не изолированы. Если поток стал причиной фатального нарушения доступа, это может привести к завершению всего процесса. Процессы изолированы друг от друга. Если процесс разрушит свое адресное пространство, проблемы ограничатся этим процессом. Процесс может допустить нарушение доступа, которое приведет к его завершению, но все остальные процессы будут продолжать выполнение. Это нарушение не окажется фатальным для всего приложения. Ошибки, связанные с некорректностью данных, могут не выйти за рамки одного процесса. Но ошибки, вызванные некорректным поведением потока, как правило, гораздо серьезнее ошибок, допущенных процессом. Потоки могут стать причиной ошибок, которые повлияют на все адресное пространство всех потоков. Процессы защищают свои ресурсы от беспорядочного доступа со стороны других процессов. Потоки же совместно используют ресурсы со всеми остальными потоками процесса. Поэтому поток, разрушающий ресурсы, оказывает негативное влияние на процесс или программу в целом.

Потоки не могут многократно использоваться другими программами

Потоки зависят от процесса, в котором они существуют, и их невозможно от него отделить. Процессы отличаются большей степенью независимости, чем потоки. Приложение можно так разделить на задачи, порученные процессам, что эти процессы можно оформить в виде модулей, которые возможно использовать в других приложениях. Потоки не могут существовать вне процессов, в которых они были созданы и, следовательно, они не являются повторно используемыми. Преимущества и недостатки потоков сведены в табл. 4.2.

Образ потока встраивается в образ процесса. Как было описано в главе 3, процесс имеет разделы программного кода, данных и стеков. Поток разделяет разделы кода и данных с остальными потоками процесса. Каждый поток имеет собственный стек, выделенный ему в стековом разделе адресного пространства процесса. Размер потокового стека устанавливается при создании потока. Если создатель потока не определяет размер его стека, то система назначает размер по умолчанию. Размер, устанавливаемый по умолчанию, зависит от конкретной системы, максимально возможного количества потоков в процессе, размера адресного пространства, выделяемого процессу, и пространства, используемого системными ресурсами. Размер потокового стека должен быть достаточно большим для любых функций, вызываемых потоком, любого кода, который является внешним по отношению к процессу (например, это Может быть библиотечный код), и хранения локальных переменных. Процесс с несколькими потоками должен иметь стековый раздел, который будет вмещать все стеки его потоков. Адресное пространство, выделенное для процесса, ограничивает раз-Мер стека, ограничивая тем самым размер, который может иметь каждый поток. На Рис.4.3 показана схема процесса, который содержит два потока.

Как показано на рис. 4.3, процесс содержит два потока А и В, и их стеки располо жены в стековом разделе процесса. Потоки выполняют различные функции: поток А вы п олняет функцию func1(), а поток В — функцию func2().

Рис. 4.3. Схема процесса, содержащего два потока (SP — указатель стека, PC — счетчик команд)

Таблица 4.2. Преимущества и недостатки потоков

Для переключения контекста требуется меньше системных ресурсов

Потоки способны повысить производительность приложения

Для обеспечения взаимодействия между потоками никакого специального механизма не требуется

Благодаря потокам структуру программы можно упростить

Для параллельного доступа к памяти (чтения или записи данных) требуется синхронизация

Потоки могут разрушить адресное пространство своего процесса

Потоки существуют в рамках только одного процесса, поэтому их нельзя повторно использовать

Атрибуты процесса содержат информацию, которая описывает процесс для операционной системы. Операционная система использует эту информацию для управления процессами, а также для того, чтобы отличать один процесс от другого. Процесс совместно использует со своими потоками практически все, включая ресурсы и переменные среды. Разделы данных, раздел программного кода и все ресурсы связаны с процессом, а не с потоками. Все, что нужно для функционирования потока, определяется и предоставляется процессом. Потоки же отличаются один от другого идентификационным номером (id), набором регистров, определяющих состояние потока, его приоритетом и стеком. Именно эти атрибуты формируют уникальность каждого потока. Как и при использовании процессов, информация о потоках хранится в структурах данных и возвращается функциями, поддерживаемыми операционной системой. Например, часть информации о потоке содержится в структуре, именуемой информационным блоком потока, который создается вместе с потоком.

Идентификационный номер (id) потока — это уникальное значение, которое идентифицирует каждый поток во время его существования в процессе. Приоритет потока определяет, каким потокам предоставлен привилегированный доступ к процессору в выделенное время. Под состоянием потока понимаются условия, в которых он пребывает в любой момент времени. Набор регистров для потока включает программный счетчик и указатель стека. Программный счетчик содержит адрес инструкции, которую поток должен выполнить, а указатель стека ссылается на вершину стека потока.

Библиотека потоков POSIX определяет объект атрибутов потока, инкапсулирующий свойства потока, к которым его создатель может получить доступ и модифицировать их. Объект атрибутов потока определяет следующие компоненты:

• стратегия планирования и параметры.

Объект атрибутов потока может быть связан с одним или несколькими потоками. При использовании этого объекта поведение потока или группы потоков определяется профилем. Все потоки, которые используют объект атрибутов, приобретают все свойства, определенные этим объектом. На рис. 4.3 показаны атрибуты, связанные с каждым потоком. Как видите, оба потока (А и В) разделяют объект атрибутов, но они поддерживают свои отдельные идентификационные номера и наборы регистров. После того как объект атрибутов создан и инициализирован, его можно использовать в любых обращениях к функциям создания потоков. Следовательно, можно создать группу потоков, которые будут иметь «малый стек и низкий приоритет» или «большой стек, высокий приоритет и состояние открепления». Открепленный (detached) поток — это поток, который не синхронизирован с другими потоками в процессе. Иначе говоря, не существует потоков, которые бы ожидали до тех пор, пока завершит выполнение открепленный поток. Следовательно, если уж такой поток существует, то его ресурсы (а именно id потока) немедленно принимаются на повторное использование. [8] Для установки и считывания значений этих атрибутов предусмотрены специальные методы. После создания потока его атрибуты нельзя изменить до тех пор, пока он существует.

Атрибут области видимости описывает, с какими потоками конкретный поток конкурирует за обладание системными ресурсами. Потоки соперничают за ресурсы в рамках двух областей видимости: процесса (потоки одного процесса) и системы (все потоки в системе). Конкуренция потоков в пределах одного и того же процесса происходит за дескрипторы файлов, а конкуренция потоков в масштабе всей системы — за ресурсы, которые выделяются системой (например, реальная память). Потоки соперничают с потоками, которые имеют область видимости процесса, и потоками из других процессов за использование процессора в зависимости от состязательного режима и областей выделения ресурсов (набора процессоров). Поток, обладающий системной областью видимости, будет обслуживаться с учетом его приоритета и стратегии планирования, которая действует для всех потоков в масштабе всей системы. Члены POSIX-объекта атрибутов потока перечислены в табл. 4.3.

Таблица 4.3. Члены объекта атрибутов потока

Атрибуты Функции Описание
detachstate int pthread_attr_ setdetachstate (pthread_attr_t *attr, int detachstate); Атрибут detachstate определяет, является ли новый поток открепленным. Если это соответствует истине, то его нельзя объединить ни с каким другим потоком
guardsize int pthread_attr_ setguardsize (pthread_attr_t *attr, size_t guardsize) Атрибут guardsize позволяет управлять размером защитной области стека нового потока. Он создает буферную зону размером guardsize на переполненяемом конце стека
inheritsched int pthread_attr_ setinheritsched (pthread_attr_t *attr, int inheritsched) Атрибут inheritsched определяет, как будут установлены атрибуты планирования для нового потока, т.е. будут ли они унаследованы от потока-создателя или установлены атрибутным объектом
param int pthread_attr_ setschedparam (pthread_attr_t *restrict attr, const struct sched_param *restrict param);
schedpolicy int pthread_attr_ setschedpolicy (pthread_attr_t *attr, int policy);
contentionscope int pthread_attr_ setscope (pthread_attr_t *attr, int contentionscope);
stackaddr int pthread_attr_ setstackaddr (pthread_attr_t *attr, void *stackaddr);
int pthread_attr_ setstack (pthread_attr_t
*attr, void *stackaddr, size_t stacksize)j
stacksize int pthread_attr_ setstacksize (pthread_attr_t *attr, size_t stacksize),
int pthread_attr_ setstack (pthread_attr_t *attr, void *stackaddr, size_t stacksize)j

Атрибут param— это структура, которую можно использовать для установки приоритета нового потока

Атрибут schedpolicy определяет стратегию планирования создаваемого потока

Атрибут contentionscope определяет, с каким множеством потоков будет соперничать создаваемый поток за использование процессорного времени. Область видимости процесса означает, что поток будет состязаться со множеством потоков одного процесса, а область видимости системы означает, что поток будет состязаться с потоками в масштабе всей системы (т.е. сюда входят потоки других процессов)

Атрибуты stackaddr и stacksize определяют базовый адрес и минимальный размер (в байтах) стека, выделяемого для создаваемого потока, соответственно

Атрибут stackaddr определяет базовый адрес стека, выделяемого для создаваемого потока

Атрибут stacksize определяет минимальный размер стека в байтах, выделяемого для создаваемого потока

Когда подходит время для выполнения процесса, процессор занимает один из его оков. Если процесс имеет только один поток, то именно он (т.е. основной поток) назначается процессору. Если процесс содержит несколько потоков и в системе есть достаточно е количе ство процессоров, то процессорам назначаются все потоки. Потоки соперничают-за процессор либо со всеми потоками из активного процесса системы, либо только

с потоками из одного процесса. Потоки помещаются в очереди готовых потоков, отсортированные по значению их приоритета. Потоки с одинаковым приоритетом назначаются процессорам в соответствии с некоторой стратегией планирования. Если система не содержит достаточного количества процессоров, поток с более высоким приоритетом может выгрузить поток, выполняющийся в данный момент. Если новый активный поток принадлежит тому же процессу, что и выгруженный, возникает переключение контекста потоков. Если же новый активный поток «родом» из другого процесса, то сначала происходит переключение контекста процессов, а затем — контекста потоков.

Потоки имеют такие же состояния и переходы между ними (см. главу 3), как и процессы. Диаграмма состояний, показанная на рис. 4.4, — это копия диаграммы, изображенной на рис. 3.4 из главы 3. (Вспомним, что процесс может пребывать в одном из четырех состояний: готовности, выполнения, останова и ожидания, или блокирования.) Состояние потока — это режим или условия, в которых поток существует в данный момент. Поток находится в состоянии готовности (работоспособности), когда он готов к выполнению. Все готовые к работе потоки помещаются в очереди готовности, причем в каждой такой очереди содержатся потоки с одинаковым приоритетом. Когда поток выбирается из очереди готовности и назначается процессору, он (поток) переходит в состояние выполнения. Поток снимается с процессора, если его квант времени истек, или если перешел в состояние готовности поток с более высоким приоритетом. Выгруженный поток снова помещается в очередь готовых потоков. Поток пребывает в состоянии ожидания, если он ожидает наступления некоторого события или завершения операции ввода-вывода. Поток прекращает выполнение, получив сигнал останова, и остается в этом состоянии до тех пор, пока не получит сигнал продолжить работу.

Рис. 4.4. Состояния потоков и переходы между ними

При получении этого сигнала поток переходит из состояния останова в состояние готовности. Переход потока из одного состояния в другое является своего рода сигналом о наступлении некоторого события. Переход конкретного потока из состояния готовности в со стояние выполнения происходит потому, что система выбрала именно его для выполнения, т.е. поток отправляется (dispatched) на процессор. Поток снимается, или выгружается (preempted), с процессора, если он делает запрос на ввод-вывод данных (или какой-либо иной запрос к ядру), или если существуют какие-то причины внешнего характера.

Один поток может определить состояние всего процесса. Состояние процесса с одним потоком синонимично состоянию его основного потока. Если его основной поток находится в состоянии ожидания, значит, и весь процесс находится в состоянии ожидания. Если основной поток выполняется, значит, и процесс выполняется. Что касается процесса с несколькими потоками, то для того, чтобы утверждать, что весь процесс находится в состоянии ожидания или останова, необходимо, чтобы все его потоки пребывали в состоянии ожидания или останова. Но если хотя бы один из его потоков активен (т.е. готов к выполнению или выполняется), процесс считается активным.

Планирование потоков и область конкуренции

Область конкуренции потоков определяет, с каким множеством потоков будет соперничать рассматриваемый поток за использование процессорного времени. Если поток имеет область конкуренции уровня процесса, он будет соперничать за ресурсы с потоками того же самого процесса. Если же поток имеет системную область конкуренции, он будет соперничать за процессорный ресурс с равными ему по правам потоками (из одного с ним процесса) и с потоками других процессов. Пусть, например, как показано на рис. 4.5, существуют два процесса в мультипроцессорной среде, которая включает три процессора. Процесс А имеет четыре потока, а процесс В — три. Для процесса А «расстановка сил» такова: три (из четырех его потоков) имеют область конкуренции уровня процесса, а один— уровня системы. Для процесса В такая «картина»: два (из трех его потоков) имеют область конкуренции уровня процесса, а один— уровня системы. Потоки процесса А с процессной областью конкуренции соперничают за процессор А, а потоки процесса В с такой же (процессной) областью конкуренции соперничают за процессор С. Потоки процессов А и В с системной областью конкуренции соперничают за процессор В.

ПРИМЕЧАНИЕ: потоки при моделировании их реального поведения в приложении Должны иметь системную область конкуренции.

Стратегия планирования и приоритет

Стратегия планирования и приоритет процесса принадлежат основному потоку. Каждый поток (независимо от основного) может иметь собственную стратегию планирования и приоритет. Потокам присваиваются целочисленные значения приоритета, к оторые лежат в диапазоне между заданными минимальным и максимальным значения- м и. Схема приоритетов используется при определении, какой поток следует назначить п роцессору: поток с более высоким приоритетом выполняется раньше потока с более н изким приоритетом. После назначения потокам приоритетов задачам, которые тре буют немедленного выполнения или ответа от системы, предоставляется необходимое

процессорное время. В операционной системе с приоритетами выполняющийся поток снимается с процессора, если в состояние готовности переходит поток с более высоким приоритетом, обладающий при этом тем же уровнем области конкуренции. Например, как показано на рис. 4.5, потоки с процессной областью конкуренции соревнуются за процессор с потоками того же процесса, имеющими такой же уровень области конкуренции. Процесс А имеет два потока с приоритетом 3, и один из них назначен процессору. Как только поток с приоритетом 2 заявит о своей готовности, активный поток будет вытеснен, а процессор займет поток с более высоким приоритетом. Кроме того, в процессе В есть два потока (процессной области конкуренции) с приоритетом 1 (приоритет 1 выше приоритета 2). Один из этих потоков назначается процессору. И хотя другой поток с приоритетом 1 готов к выполнению, он не вытеснит поток с приоритетом 2 из процесса А, поскольку эти потоки соперничают за процессор в рамках своих процессов. Потоки с системной областью конкуренции и более низким приоритетом не вытесняются ни одним из потоков из процессов А или В. Они соперничают за процессорное время только с потоками, имеющими системную область конкуренции.

Рис. 4.5. Планирование потоковпроцессной и системной областями конкуренции) в мультипроцессорной среде

Как упоминалось в главе 3, очереди готовности организованы в виде отсортированных списков, в которых каждый элемент представляет собой уровень приоритета. Под уровнем приоритета понимается очередь потоков с одинаковым значением приоритета. Все потоки одного уровня приоритета назначаются процессору с использованием стратегии планирования: FIFO (сокр. от First In First OuU т.е. первым прибыл, первым обслужен), RR (сокр. от round-robin, т.е. циклическая) или какой-либо другой. При использовании стратегии планирования FIFO поток, квант процессорного времени которого истек, помещается в головную часть очереди соответствующего приоритетного уровня, а процесс назначается следующему потоку из очереди. Следовательно, поток будет выполняться до тех пор, пока он не завершит выполнение, не перейдет в состояние ожидания («заснет») или не получит сигнал остановиться. Когда «спящий» поток «просыпается», он помещается в конец очереди соответствующего приоритетного уровня. Стратегия планирования RR аналогична FIFO стратегии, за исключением того, что по истечении кванта процессорного времени поток помещается не в начало, а в конец «своей» очереди. Циклическая стратегия планирования (RR) считает все потоки обладающими одинаковыми приоритетами и каждому потоку предоставляет процессор только в течение некоторого кванта времени. Поэтому выполнение задач получается попеременным. Например, программа, которая выполняет поиск файлов по заданным ключевым словам, разбивается на два потока. Один поток (1) находит все файлы с заданным расширением и помещает их пути в контейнер. Второй поток (2) выбирает имена файлов из контейнера, просматривает каждый файл на предмет наличия в нем заданных ключевых слов, а затем записывает имена файлов, которые содержат такие слова. Если к этим потокам применить циклическую стратегию планирования с единственным процессором, то поток 1 использовал бы свой квант времени для поиска файлов и вставки их путей в контейнер. Поток 2 использовал бы свой квант времени для выделения имен файлов и поиска заданных ключевых слов. В идеальном мире потоки 1 и 2 должны выполняться попеременно. Но в действительности все может быть иначе. Например, поток 2 может выполниться до потока 1, когда в контейнере еще нет ни одного файла, или поток 1 может так долго искать файл, что до истечения кванта времени не успеет записать его путь в контейнер. Такая ситуация требует синхронизации, краткое рассмотрение которой приводится ниже в этой главе и в главе 5. Стратегия планирования FIFO позволяет каждому потоку выполняться до завершения. Если рассмотреть тот же пример с использованием FIFO-стратегии, то поток 1 будет иметь достаточно времени, чтобы отыскать все нужные файлы и вставить их пути в контейнер. Поток 2 затем выделит имена файлов и выполнит поиск заданных ключевых слов. В идеальном мире завершение выполнения потока 2 будет означать завершение программы в целом. Но в реальном мире поток 2 может быть назначен процессору до потока 1, когда контейнер еще не будет содержать файлов для поиска в них ключевых слов. После «холостого» выполнения потока 2 процессору будет назначен поток 1, который может успешно отыскать нужные файлы и поместить в контейнер их пути. Однако поиск ключевых слов выполнять уже будет некому. Поэтому программа в целом потерпит фиаско. При использовании FIFO-стратегии не предусматривается перемешивания задач. Поток, назначенный процессору, занимает его До полного выполнения своей задачи. Такую стратегию планирования можно использовать для приложений, в которых потоки необходимо выполнить как можно скорее.

°Д Другими» стратегиями планирования подразумеваются уже рассмотренные, но с небольшими вариациями. Например, FIFO-стратегия может быть изменена таким

разом, чтобы позволить разблокировать потоки, выбранные случайно.

Изменение приоритета потоков

Приоритеты потоков следует менять, чтобы ускорить выполнение потоков, от которых зависит выполнение других потоков. И, наоборот, этого не следует делать ради того, чтобы какой-то конкретный поток получил больше процессорного времени. Это может изменить общую производительность системы. Потоки с более высоким классом приоритета получают больше процессорного времени, чем потоки с более низким классом приоритета, поскольку они выполняются чаще. Потоки с более высоким приоритетом практически монополизируют процессор, не выделяя потокам с более низким приоритетом такое ценное процессорное время. Эта ситуация получила название информационного голода (starvation). Системы, в которых используются механизмы динамического назначения приоритетов, реагируют на подобную ситуацию путем назначения приоритетов, которые бы действовали в течение коротких периодов времени. Система регулирует приоритет потоков таким образом, чтобы потоки с более низким приоритетом увеличили время выполнения. Такой подход должен повысить общую производительность системы.

Гарантировать, что конкретный процесс или поток будет выполняться до его полного завершения, — все равно что присвоить ему самый высокий приоритет. Однако реализация такой стратегии может повлиять на общую производительность системы. Такие привилегированные потоки могут нарушить взаимодействие программных компонентов через сетевые средства коммуникации, вызвав потерю данных. На потоки, которые управляют интерфейсом пользователя, может быть оказано чрезмерно большое влияние, выраженное в замедлении реакции на использование клавиатуры, мыши или экрана. В некоторых системах пользовательским процессам или потокам не назначается более высокий приоритет, чем системным процессам. В противном случае системные процессы или потоки не смогли бы реагировать на критические изменения в системе. Поэтому большинство пользовательских процессов и потоков попадают в категорию программных компонентов с нормальным (средним) приоритетом.

Потоки используют большую часть своих ресурсов вместе с другими потоками из того же процесса. Собственные ресурсы потока определяют его контекст. Так, в контекст потока входят его идентификационный номер, набор регистров (включающих указатель стека и программный счетчик) и стек. Остальные ресурсы (процессор, память и файловые дескрипторы), необходимые потоку для выполнения его задачи, он должен разделять с другими потоками. Дескрипторы файлов выделяются каждому процессу в отдельности, и потоки одного процесса соревнуются за доступ к этим дескрипторам. Что касается памяти, процессора и других глобально распределяемых ресурсов, то за доступ к ним потоки конкурируют с другими потоками своего процесса, а также с потоками других процессов.

Поток при выполнении может запрашивать дополнительные ресурсы, например, файлы или мьютексы, но они становятся доступными для всех потоков процесса. Существуют ограничения на ресурсы, которые может использовать один процесс. Таким образом, все потоки в общей сложности не должны превышать предельный объем ресурсов, выделяемых процессу. Если поток попытается расходовать больше ресурсов, чем предусмотрено предельным объемом, формируется сигнал о том, что достигнут предельный объем ресурсов для данного процесса. Потоки, которые используют ресурсы, должны следить за тем, чтобы эти ресурсы не оставались в нестабильном состоянии после их аннулирования. Поток, который открыл файл или создал мьютекс, может завершиться, оставив этот файл открытым или мьютекс заблокированным. Если приложение завершилось, а файл не был закрыт надлежащим образом, это может привести к его разрушению или потере данных. Завершение потока после блокировки мьютекса надежно запирает доступ к критическому разделу, который находится под контролем этого мьютекса. Перед завершением поток должен выполнить некоторые действия очистительно-восстановительного характера, чтобы Н е допустить возникновения нежелательных ситуаций.

Модели создания и функционирования потоков

Цель потока— выполнить некоторую работу от имени процесса. Если процесс содержит несколько потоков, каждый поток выполняет некоторые подзадачи как части общей задачи, выполняемой процессом. Потокам делегируется работа в соответствии с конкретной стратегией, которая определяет, каким образом реализуется делегирование работы. Если приложение моделирует некоторую процедуру или объект, то выбранная стратегия должна отражать эту модель. Используются следующие распространенные модели:

• сеть с равноправными узлами;

Каждая модель характеризуется собственной декомпозицией работ (Work Breakdown Structure — WBS), которая определяет, кто отвечает за создание потоков и при каких условиях они создаются. Например, существует централизованный подход, при котором один поток создает другие потоки и каждому из них делегирует некоторую работу. Существует также конвейерный (assembly-line) подход, при котором на различных этапах потоки выполняют различную работу. Созданные потоки могут выполнять одну и ту же задачу на различных наборах данных, различные задачи на одном и том же наборе данных или различные задачи на различных наборах данных. Потоки подразделяются на категории по выполнению задач только определенного типа. Например, можно создать группы потоков, которые будут выполнять только вычисления, только ввод или только вывод данных.

Возможны задачи, для успешного решения которых следует комбинировать перечисленные выше модели. В главе 3 мы рассматривали процесс визуализации. За-Дачи 1, 2 и 3 выполнялись последовательно, а задачи 4, 5 и 6 могли выполняться параллельно. Все задачи можно выполнить различными потоками. Если необходимо визуализировать несколько изображений, потоки 1, 2 и 3 могут сформировать конвейер. По завершении потока 1 изображение передается потоку 2, в то время к ак поток 1 может выполнять свою работу над следующим изображением. После буферизации изображений потоки 4, 5 и 6 могут реализовать параллельную обработку. Модель функционирования потоков представляет собой часть структурирования па раллелизма в приложении, в котором каждый поток может выполняться на отдельном процессоре. Модели функционирования потоков (и их краткое описание) приведены в табл. 4.4.

Таблица 4.4. Модели функционирования потоков

Модель с равно-правными узлами

Центральный поток («управляющий») создает потоки («рабочие»), назначая каждому из них задачу. Управляющий поток может ожидать до тех пор, пока все потоки не завершат выполнение своих задач

Все потоки имеют одинаковый рабочий статус. Такие потоки называются равноправными Поток создает все потоки, необходимые для выполнения задач, но не осуществляет никакого делегирования ответственности. Равноправные потоки могут обрабатывать запросы от одного входного потока данных, разделяемого всеми потоками, или каждый поток может иметь собственный входной поток данных

Конвейерный подход применяется для поэтапной обработки потока входных данных. Каждый этап — это поток, который выполняет работу на некоторой совокупности входных данных. Когда эта совокупность пройдет все этапы, обработка всего потока данных будет завершена

Поток-«изготовитель» готовит данные , потребляемые потоком- «потребителем». Данные сохраняются в блоке памяти, разделяемом потоками — «изготовителем» и «потребителем»

В модели делегирования один поток («управляющий») создает потоки («рабочие») и назначает каждому из них задачу. Управляющему потоку нужно ожидать до тех пор, пока все потоки не завершат выполнение своих задач. Управляющий поток делегирует задачу, которую каждый рабочий поток должен выполнить, путем задания некоторой функции. Вместе с задачей на рабочий поток возлагается и ответственность за ее выполнение и получение результатов. Кроме того, на этапе получения результатов возможна синхронизация действий с управляющим (или другим) потоком.

Управляющий поток может создавать рабочие потоки в результате запросов, обращенных к системе. При этом обработка запроса каждого типа может быть делегирована рабочему потоку. В этом случае управляющий поток выполняет некоторый цикл событий. По мере возникновения событий рабочие потоки создаются и на них тут же возлагаются определенные обязанности. Для каждого нового запроса, обращенного к системе, создается новый поток. При использовании такого подхода процесс может превысить предельный объем выделенных ему ресурсов или предельное количество потоков. В качестве альтернативного варианта управляющий поток может создать пул потоков, которым будут переназначаться новые запросы. Управляющий поток создает во время инициализации некоторое количество потоков, а затем каждый поток приостанавливается до тех пор, пока не будет добавлен запрос в их очередь. По мере размещения запросов в очереди управляющий поток сигнализирует рабочему о необходимости обработки запроса. Как только поток справится со своей задачей, он извлекает из очереди следующий запрос. Если в очереди больше нет доступных запросов, поток приостанавливается до тех пор. пока управляющий поток не просигналит ему о появлении очередного задания в очереди. Если все рабочие потоки должны разделять одну очередь, то их можно

программировать на обработку запросов только определенного типа. Если тип запроса в очереди не совпадает с типом запросов, на обработку которых ориентирован данный поток, то он может снова приостановиться. Главная цель управляю-потока — создать все потоки, поместить задания в очередь и «разбудить» рабочие потоки, когда эти задания станут доступными. Рабочие потоки справляются о наличии запроса в очереди, выполняют назначенную задачу и приостанавливаются сами, если для них больше нет работы. Все рабочие и управляющий потоки выполняются параллельно. Описанные два подхода к построению модели делегирования представлены для сравнения на рис. 4.6.

Модель с равноправными узлами

Если в модели делегирования есть управляющий поток, который делегирует задачи рабочим потокам, то в модели с равноправными узлами все потоки имеют одинаковый рабочий статус. Несмотря на существование одного потока, который изначально создает все потоки, необходимые для выполнения всех задач, этот поток считается рабочим потоком, но он не выполняет никаких функций по делегированию задач. В этой модели нет никакого централизованного потока, но на рабочие потоки возлагается большая ответственность. Все равноправные потоки могут обрабатывать запросы из одного входного потока данных, либо каждый рабочий поток может иметь собственный входной поток данных, за который он отвечает. Входной поток данных может также храниться в файле или базе данных. Рабочие потоки могут нуждаться во взаимодействии и разделении ресурсов. Модель равноправных потоков представлена на рис. 4.7.

Рис. 4.7. Модель равноправных потоков (или модель с равноправными узлами)

Модель конвейера подобна ленте сборочного конвейера в том, что она предполагает наличие потока элементов, которые обрабатываются поэтапно. На каждом этапе отдельный поток выполняет некоторые операции над определенной совокупностью входных данных. Когда эта совокупность данных пройдет все этапы, обработка всего входного потока данных будет завершена. Этот подход позволяет обрабатывать несколько входных потоков одновременно. Каждый поток отвечает за получение промежуточных результатов, делая их доступными для следующего этапа (или следующего потока) конвейера Последний этап (или поток) генерирует результаты работы конвейера в целом. По мере того как входные данные проходят по конвейеру, не исключено, что некоторые их порции придется буферизировать на определенных этапах, пока потоки еще занимаются обработкой предыдущих порций. Это может вызвать торможение конвейера, если окажется, что обработка данных на каком-то этапе происходит медленнее, чем на других. При этом образуется отставание в работе. Чтобы предотвратить отставание, можно для «слабого» этапа создать дополнительные потоки. Все этапы конвейера должны быть уравновешены по времени, чтобы ни один этап не занимал больше времени, чем другие. Для этого необходимо всю работу распределить по конвейеру равномерно. Чем больше этапов в конвейере, тем больше должно быть создано потоков обработки. Увеличение количества потоков также может способствовать предотвращению отставаний в работе. Модель конвейера представлена на рис. 4.8.


Рис. 4.8. Модель конвейера

В модели «изготовитель-потребитель» существует поток-«изготовитель», который готовит данные, потребляемые потоком-«потребителем». Данные сохраняются в блоке памяти, разделяемом между потоками «изготовителем» и «потребителем». Поток-изготовитель» должен сначала приготовить данные, которые затем поток-^потребитель» получит. Такому процессу необходима синхронизация. Если поток-изготовитель» будет поставлять данные гораздо быстрее, чем поток-«потребитель» сможет их потреблять, поток-«изготовитель» несколько раз перезапишет результаты, полученные им ранее, прежде чем поток-«потребитель» успеет их обработать. Но если поток-«потребитель» будет принимать данные гораздо быстрее, чем поток-изготовитель» сможет их поставлять, поток-«потребитель» будет либо снова обрабатывать уже обработанные им данные, либо попытается принять еще не подготовленные данные. Модель «изготовитель-потребитель» представлена на рис. 4.9.

Модели SPMD и МРМD для потоков

В каждой из описанных выше моделей потоки вновь и вновь выполняют одну и ту задачу на различных наборах данных или им назначаются различные задачи для выполнения на различных наборах данных. Эти потоковые модели используют схемы (Single-Program, Multiple-Data — одна программа, несколько потоков данных) и MPMD (Multiple-Programs, Multiple-Data — множество программ, множество потоков данных). Эти схемы представляют собой модели параллелизма, которые делят программы на потоки инструкций и данных. Их можно использовать для описания типа работы, которую реализуют потоковые модели с использованием параллелизма. В контексте нашего изложения материала модель MPMD лучше представить как модель MTMD (Multiple-Threads, Multiple-Data— множество потоков выполнения, множество потоков данных). Эта модель описывает систему с различными потоками выполнения (thread), которые обрабатывают различные наборы данных, или потоки данных (stream). Аналогично модель SPMD нам лучше рассматривать как модель STMD (Single-Thread, Multiple-Data — один поток выполнения, несколько потоков данных). Эта модель описывает систему с одним потоком выполнения, который обрабатывает различные наборы, или потоки, данных. Это означает, что различные наборы данных обрабатываются несколькими идентичными потоками выполнения (вызывающими одну и ту же подпрограмму).

Рис. 4.9. Модель конвейера

Как модель делегирования, так и модель равноправных потоков могут использовать модели параллелизма STMD и MTMD. Как было описано выше, пул потоков может выполнять различные подпрограммы для обработки различных наборов данных. Такое поведение соответствует модели MTMD. Пул потоков может быть также настроен на выполнение одной и той же подпрограммы. Запросы (или задания), отсылаемые системе, могут представлять собой различные наборы данных, а не различные задачи. И в этом случае поведение множества потоков, реализующих одни и те же инструкции, но на различных наборах данных, соответствует модели STMD. Модель равноправных потоков может быть реализована в виде потоков, выполняющих одинаковые или различные задачи. Каждый поток выполнения может иметь собственный поток данных или несколько файлов сданными, предназначенных для обработки каждым потоком. В модели конвейера используется МТМГ>модель параллелизма. На разных этапах выполняются различные виды обработки, поэтому в любой момент времени различные совокупности входных данных будут находиться на различных этапах выполнения. Модельное представление конвейера было бы бесполезным, если бы на каждом этапе выполнялась одна и та же обработка. Модели STMD и MTMD представлены на рис. 4.10.

Введение в библиотеку Pthread

Библиотека Pthread предоставляет API-интерфейс для создания и управления потоками в приложении. Библиотека Pthread основана на стандартизированном интерфейсе программирования, который был определен комитетом по выпуску стандартов IEEE в стандарте POSIX 1003.1с. Сторонние фирмы-изготовители придерживаются стандарта POSIX в реализациях, которые именуются библиотеками потоков Pthread или POSIX.

Рис. 4.10. Модели параллелизма STMD и MTMD

Библиотека Pthread содержит более 60 функций, которые можно разделить на следующие категории.

1. Функции управления потоками.

1.1. Конфигурирование потоков.

1.2. Отмена потоков.

1.3. Стратегии планирования потоков.

1.4. Доступ к данным потоков.

1.5. Обработка сигналов.

1.6. Функции доступа к атрибутам потоков.

1.6.1. Конфигурирование атрибутов потоков.

1.6.2. Конфигурирование атрибутов, относящихся к стекам потоков.

1.6.3. Конфигурирование атрибутов, относящихся к стратегиям планирования потоков.

2. Функции управления мьютексами.

2.1. Конфигурирование мьютексов.

2.2. Управление приоритетами.

2.3. Функции доступа к атрибутам мьютексов.

2.3.1. Конфигурирование атрибутов мьютексов.«

2.3.2. Конфигурирование атрибутов, относящихся к протоколам мьютексов.

2.3.3. Конфигурирование атрибутов, относящихся к управлению приоритетами мьютексов.

3. Функции управления условными переменными.

3.1. Конфигурирование условных переменных.

3.2. Функции доступа к атрибутам условных переменных.

3.2.1. Конфигурирование атрибутов условных переменных.

3.2.2. Функции совместного использования условных переменных.

Библиотека Pthread может быть реализована на любом языке, но для соответствия стандарту POSIX она должна быть согласована со стандартизированным интерфейсом. Библиотека Pthread — не единственная реализация потокового API-интерфейса Существуют другие реализации, созданные сторонними фирмами-производителями аппаратных и программных средств. Например, среда Sun поддерживает библиотеку Pthread и собственный вариант библиотеки потоков Solaris. В этой главе мы рассмотрим некоторые функции библиотеки Pthread, которые реализуют управление потоками.

Анатомия простой многопоточной программы

Любая простая многопоточная программа должна состоять из основного потока и функций, которые будут выполнять другие потоки. Выбранная для реализации модель создания и функционирования потоков определяет, каким образом в программе будут созда ваться потоки и как будет осуществляться управление ими. Потоки создаются по принципу «все и сразу» или при определенных условиях. Пример простой многопоточной программы, в которой реализована модель делегирования, представлен в листинге 4.1.

// Листинг 4.1. Использование модели делегирования в

void *task1(void *X) //define task to be executed by ThreadA

Для компиляции многопоточного приложения в средах UNIX или Linux с помощью компиляторов командной строки g++ или gcc необходимо скомпоновать его с библиотекой Pthreads. Для задания библиотеки используйте опцию -l. Так, команда -lpthread обеспечит компоновку вашего приложения с библиотекой, которая согласуется с многопоточным интерфейсом, определенным стандартом POSIX 1003.1с. Библиотеку Pthread, libpthread.so , следует поместить в каталог, в котором хранится системная стандартная библиотека, обычно это /usr/lib. Если она будет находиться не в стандартном каталоге, то для того, чтобы обеспечить поиск компилятора в заданном каталоге до поиска в стандартных, используйте опцию -L. По команде g++ -о blackboard -L /src/local/lib blackboard.cpp -lpthread компилятор выполнит поиск библиотеки Pthread сначала в каталоге /src/local/lib, а затем в стандартных каталогах.

Законченные программы, представленные в этой книге, сопровождаются профилем. Профиль программы содержит такие специальные сведения по ее реализации, как необходимые заголовки и библиотеки, а также инструкции по компиляции и компоновке. Профиль программы также включает раздел примечаний, содержащий специальную информацию, которую необходимо учитывать при выполнении программы.

Библиотека Pthreads используется для создания, поддержки и управления потоками многопоточных программ и приложений. При создании многопоточной программы потоки могут создаваться на любом этапе выполнения процесса, поскольку это — динамические образования. Функция pthread_create() создает новый поток в адресном пространстве процесса. Параметр thread указывает на дескриптор, или идентификатор (id), создаваемого потока. Новый поток будет иметь атрибуты, заданные объектом attr. Созданный поток немедленно приступит к выполнению инструкций, заданных параметром start_routine с использованием аргументов, заданных параметром arg. При успешном создании потока функция возвращает его идентификатор (id), значение которого сохраняется в параметре thread.

int pthread_create(pthread_t *restrict thread,

const pthread_attr_t *restrict attr,

Если параметр attr содержит значение NULL, новый поток будет использовать атрибуты, действующие по умолчанию. В противном случае новый поток использует атрибуты, заданные параметром attr при его создании. Если же значение параметра attr изменится после того, как поток был создан, это никак не отразится на его атрибутах. При завершении параметра-функции start_routine завершается и поток, причем так, как будто была вызвана функция pthread_exit() с использованием в качестве статуса завершения значения, возвращаемого функцией start_routine.

При успешном завершении функция возвращает число 0 . В противном случае по-не создается, и функция возвращает код ошибки. Если в системе отсутствуют ресурсы для создания потока, или в процессе достигнут предел по количеству возможных потоков, выполнение функции считается неудачным. Неудачным оно также будет в случае, если атрибут потока задан некорректно или если инициатор вызова потока не имеет разрешения на установку необходимых атрибутов потока.

Приведем примеры создания двух потоков с заданными по умолчанию атрибутами:

Это — два вызова функции pthread_create () из листинга 4 .1. Оба потока создаются с атрибутами, действующими по умолчанию.

В программе 4 .1 отображен основной поток, который передает аргумент из командной строки в функции, выполняемые потоками.

Часть 9. Процессы и потоки

Серия контента:

Этот контент является частью # из серии # статей: Программирование на Python

Этот контент является частью серии: Программирование на Python

Следите за выходом новых статей этой серии.

С появлением многоядерных процессоров стала общеупотребительной практика распространять нагрузку на все доступные ядра. Существует два основных подхода в распределении нагрузки: использование процессов и потоков.

Использование нескольких процессов фактически означает использование нескольких программ, которые выполняются независимо друг от друга. Программно это решается с помощью системных вызовов exec и fork. Такой подход создает большие неудобства в управлении обмена данными между этими программами.

В качестве альтернативы существует другой подход – создание многопоточных программ. Обмен данными между потоками существенно упрощается. Но управление такими программами усложняется, и вся ответственность ложится на программиста.

Сегодня мы рассмотрим следующие темы.

  • Как работают процессы.
  • Как работают потоки в питоне.
  • Создание потока.
  • Очереди (Queue).
  • Блокировки (Lock).

1. Как работают процессы

В питоне есть стандартный модуль subprocess, который упрощает управление другими программами, передавая им опции командной строки и организуя обмен данными через каналы (pipe). Мы рассмотрим пример, в котором пользователь запускает программу из командной строки, которая в свою очередь запустит несколько дочерних программ. В данном примере два скрипта – рarent.py и child.py. Запускается parent.py. Child.py выступает в роли аргумента command, который передается в запускаемый процесс. У этого процесса есть стандартный вход, куда мы передаем два аргумента – поисковое слово и имя файла. Мы запустим два экземпляра программы child.py, каждый экземпляр будет искать слово word в своем файле – это будут файлы исходников самих программ. Запись на стандартный вход осуществляет модуль subprocess. Каждый процесс пишет результат своего поиска в консоль. В главном процессе мы ждем, пока все child не закончат свою работу.

2. Как работают потоки в питоне

Если нужно, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр.

Когда в одной программе работают несколько потоков, возникает проблема разграничения доступа потоков к общим данным. Предположим, что есть два потока, имеющих доступ к общему списку. Первый поток может делать итерацию по этому списку:

а второй в этот момент начнет удалять значения из этого списка. Тут может произойти все что угодно: программа может упасть, или мы просто получим неверные данные.

Решением в этом случае является применение блокировки. При этом доступ к заблокированному списку будет иметь только один поток, второй будет ждать, пока блокировка не будет снята.

Применение блокировки порождает другую проблему – дедлок (deadlock) – мертвая блокировка. Пример дедлока: имеется два потока и два списка. Первый поток блокирует первый список, второй поток блокирует второй список. Первый поток изнутри первой блокировки пытается получить доступ к уже заблокированному второму списку, второй поток пытается проделать то же самое с первым списком. Получается неопределенная ситуация с бесконечным ожиданием. Эту ситуации легко описать, на практике все гораздо сложнее.

Вариантом решения проблемы дедлоков является политика определения очередности блокировок. Например, в предыдущем примере мы должны определить, что блокировка первого списка идет всегда первой, а уже потом идет блокировка второго списка.

Другая проблема с блокировками – в том, что несколько потоков могут одновременно ждать доступа к уже заблокированному ресурсу и при этом ничего не делать. Каждая питоновская программа всегда имеет главный управляющий поток.

Питоновская реализация многопоточности ограниченная. Интерпретатор питона использует внутренний глобальный блокировщик (GIL), который позволяет выполняться только одному потоку. Это сводит на нет преимущества многоядерной архитектуры процессоров. Для многопоточных приложений, которые работают в основном на дисковые операции чтения/записи, это не имеет особого значения, а для приложений, которые делят процессорное время между потоками, это является серьезным ограничением.

3. Создание потока

Для создания потоков мы будем использовать стандартный модуль threading. Есть два варианта создания потоков:

Следующий пример показывает, как к потоку приаттачить функцию через вызов функции:

Пример на создание потока через вызов класса: в конструкторе обязательно нужно вызвать конструктор базового класса. Для запуска потока нужно выполнить метод start() объекта-потока, что приведет к выполнению действий в методе run() :

Для управления потоками существуют методы:


start() – дает потоку жизнь.

run() –этот метод представляет действия, которые должны быть выполнены в потоке.

join([timeout]) – поток, который вызывает этот метод, приостанавливается, ожидая завершения потока, чей метод вызван. Параметр timeout (число с плавающей точкой) позволяет указать время ожидания (в секундах), по истечении которого приостановленный поток продолжает свою работу независимо от завершения потока, чей метод join был вызван. Вызывать join() некоторого потока можно много раз. Поток не может вызвать метод join() самого себя. Также нельзя ожидать завершения еще не запущенного потока.

getName() – возвращает имя потока.

setName(name) – присваивает потоку имя name .

isAlive() – возвращает истину, если поток работает (метод run() уже вызван).

isDaemon() – возвращает истину, если поток имеет признак демона.

setDaemon(daemonic) – устанавливает признак daemonic того, что поток является демоном.

activeCount() – возвращает количество активных в настоящий момент экземпляров класса Thread. Фактически это len(threading.enumerate()) .

currentThread() – возвращает текущий объект-поток, т.е. соответствующий потоку управления, который вызвал эту функцию.

enumerate() – возвращает список активных потоков.

4. Очереди (Queue)

В следующем примере будет решена аналогичная задача, что и в предыдущем примере с процессами: будут запущены три потока, каждый из которых будет работать по принципу утилиты grep . Имеется глобальный ресурс – work_queue – список файлов для поиска, который мы положим в очередь. Для этого будет использован объект Queue , который имеет встроенную блокировку:

5. Блокировки (Lock)

В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – url_list – доступ к которому будет блокироваться с помощью блокировки threading.Lock() . Объект Lock имеет методы:

acquire([blocking=True]) – делает запрос на запирание замка. Если параметр blocking не указан или является истиной, то поток будет ожидать освобождения замка.

Если параметр не был задан, метод не возвратит значения.

Если blocking был задан и истинен, метод возвратит True (после успешного овладения замком).

Если блокировка не требуется (т.е. задан blocking=False ), метод вернет True , если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено False .

release() – запрос на отпирание замка.

locked() – возвращает текущее состояние замка ( True – заперт, False – открыт).

Заключение

Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование. В обычной программе действует всего один поток управления, а в многопоточной одновременно могут работать несколько потоков.

В многопоточной программе усложняется контроль за обменом данных между потоками. Глобальные ресурсы необходимо предохранять от одновременного доступа со стороны нескольких потоков, чтобы не нарушить их целостности. В этой статье были рассмотрены инструменты контроля глобальных данных – блокировки, очереди.

Приведенные примеры проверялись на версии питона 2.6.

Регулирование параллельной обработки в ThreadPool в CLR 4.0

Эрика Фуентес

В ThreadPool в новейшем выпуске CLR (версии 4.0) было внесено несколько серьезных изменений по сравнению с CLR 2.0. Недавние крупные перемены в технологиях — вроде широкого распространения многоядерных архитектур процессоров и естественного появления потребности в распараллеливании существующих приложений или написания нового параллельного кода — стали одной из весомых причин для совершенствования CLR ThreadPool.

В статье «Управление потоками в CLR» (msdn.microsoft.com/magazine/dd252943) под рубрикой «CLR с изнанки» в номере «MSDN Magazine» за январь 2009 г. я обсуждала некоторые из этих причин и сопутствующие вопросы, в том числе управление параллельной обработкой. Теперь я опишу, как мы решили эти задачи в ThreadPool в CLR 4.0, какой выбор был у нас в реализации и как он мог повлиять на поведение общеязыковой исполняющей среды (CLR). Я уделю внимание подходу, принятому для автоматизации управления параллельной обработкой в текущей версии ThreadPool для CLR 4.0 (далее для краткости просто ThreadPool). Кроме того, я дам краткий обзор архитектуры ThreadPool. Моя статья охватывает и детали реализации, подлежащие изменению в будущих версиях. Все это может оказаться полезным для понимания и использования преимуществ поведения текущей версии ThreadPool не только тем, кто проектирует и пишет новые параллельные приложения, но и тем, кто заинтересован в улучшении старых приложений за счет распараллеливания, а также в использовании технологий ASP.NET или Parallel Extension (в контексте CLR 4.0).

Обзор ThreadPool

Пул потоков (thread pool) обеспечивает ключевые сервисы, в частности управление потоками, абстракции различных типов параллельной обработки и регулирование (throttling) параллельных операций. Эти сервисы снимают часть бремени ручной работы с программистов. Для неопытных программистов очень удобно, что не требуется изучать и иметь дело со сложными деталями внутреннего устройства многопоточной среды. Более опытные программисты, располагая надежной системой потоков, могут сосредоточиться на улучшении различных аспектов своих приложений. ThreadPool предоставляет эти сервисы управляемым приложениям и поддерживает перенос между платформами, позволяя выполнять определенные приложения Microsoft .NET Framework, например, в Mac OS.

К разным частям системы могут относиться разные типы параллельной обработки. Наиболее релевантные: процессорный параллелизм, параллелизм ввода-вывода, таймеры и синхронизация, балансировка нагрузки и использование ресурсов. Я кратко обрисую архитектуру ThreadPool в терминах различных аспектов параллельной обработки (подробнее об архитектуре ThreadPool и применении соответствующих API см.«The CLR’s Thread Pool» по ссылке msdn.microsoft.com/magazine/cc164139). Особо стоит отметить, что существует две независимые реализации ThreadPool: одна из них имеет дело с процессорным параллелизмом и называется пулом рабочих потоков (worker ThreadPool), а другая — с параллелизмом ввода-вывода и может быть названа пулом потоков ввода-вывода. В следующем разделе мы сосредоточимся на процессорном параллелизме и соответствующей реализации в ThreadPool, в частности поговорим о стратегиях регулирования параллельной обработки.

Пул рабочих потоков Он предоставляет сервисы на уровне процессорного параллелизма и использует преимущества многоядерных архитектур. В отношении процессорного параллелизма нужно учитывать два важнейших фактора: быстрая и оптимальная диспетчеризация работы и регулирование степени параллелизма. В первом случае реализация ThreadPool использует такие стратегии, как очереди, свободные от блокировок (lock-free queues), чтобы избежать конкуренции и «кражи» работы при балансировке нагрузки; однако эти области выходят за рамки нашего обсуждения (подробно эта тематика освещается по ссылке msdn.microsoft.com/magazine/cc163340). А во втором случае — регулировании степени параллелизма — реализация ThreadPool обеспечивает управление параллельной обработкой, чтобы предотвратить конкуренцию за ресурсы и последующее уменьшение общей пропускной способности.

Процессорный параллелизм может представлять особую проблему, потому что на него влияет множество параметров, скажем, определение того, сколько рабочих элементов можно выполнять одновременно в любой конкретный момент. Другая проблема — количество ядер и как оптимизировать распределение различных типов рабочих нагрузок. Например, в теории оптимальным считается один поток на процессор, но, если рабочая нагрузка постоянно блокируется, процессорное время тратится впустую, и можно было бы использовать большее число потоков для выполнения большего объема работы. Размер и тип рабочей нагрузки на самом деле являются еще одним параметром. Так, в случае блокирующихся рабочих нагрузок крайне трудно определить число потоков, обеспечивающих оптимальную пропускную способность, поскольку сложно вычислить, когда будет завершена обработка запроса (или даже насколько часто будут появляться запросы — это тесно связано с блокировкой ввода-вывода). Соответствующий API в этом ThreadPool — QueueUserWorkItem, который помещает метод (рабочий элемент) в очередь для выполнения (msdn.microsoft.com/library/system.threading.threadpool.queueuserworkitem). Он рекомендуется для использования в приложениях, где есть работа, которую потенциально можно выполнять параллельно (с другими задачами). Эта работа передается ThreadPool, который автоматически «выясняет», когда запускать ее выполнение. Этот механизм снимает с программиста заботу о том, как и когда создавать потоки; но, увы, это не самое эффективное решение на все случаи жизни.

Пул потоков ввода-вывода Эта часть реализации ThreadPool, относящаяся к параллелизму ввода-вывода, обрабатывает блокирующиеся рабочие нагрузки (т. е. запросы на ввод-вывод, обслуживание которых требует сравнительно длительного времени) или асинхронный ввод-вывод.В асинхронных вызовах потоки не блокируются и могут выполнять другую работу в ожидании обслуживания запроса. Этот ThreadPool обеспечивает координацию между запросами и потоками. Пул потоков ввода-вывода, как и пул рабочих потоков, имеет алгоритм для регулирования параллельной обработки; он управляет количеством потоков на основе скорости завершения асинхронных операций. Но этот алгоритм весьма сильно отличается от алгоритма, заложенного в пул рабочих потоков, и выходит за рамки данной статьи.

Параллельная обработка в ThreadPool

Параллельная обработка — задача трудна, но необходимая, так как она прямо влияет на общую производительность системы. Как система регулирует параллельную обработку, сказывается и на других задачах вроде синхронизации, использовании ресурсов и балансировке нагрузки (и наоборот).

Концепция управления параллельной обработкой (concurrency control), или точнее регулирования параллельной обработки (throttling concurrency), относится к тому, сколько потоков разрешается запускать для выполнения работы в конкретное время в ThreadPool; это политика, определяющая, сколько потоков может выполняться одновременно без вреда для производительности. В этой статье управление параллельной обработкой рассматривается лишь применительно к пулу рабочих потоков. Как бы то ни противоречило термину, суть управления параллельной обработкой заключается в регулировании и уменьшении количества рабочих элементов, которые могут выполняться параллельно, с целью увеличения пропускной способности пула рабочих потоков (т. е. управление степенью параллелизма предотвращает выполнение некой части работы).

Алгоритм управления параллельной обработкой в ThreadPool автоматически выбирает уровень параллелизма; он решает за программиста, сколько потоков нужно для поддержания общей производительности на оптимальном уровне. Реализация этого алгоритма — одна из сложнейших и интересных частей ThreadPool. Существуют различные подходы к оптимизации производительности ThreadPool в контексте уровня параллелизма (другими словами, к определению «правильного» количества потоков, выполняемых одновременно). В следующем разделе я опишу некоторые из этих подходов, которые рассматривались или применялись в CLR.

Эволюция управления параллельной обработкой в ThreadPool

Одним из первых предпринятых подходов была оптимизация на основе наблюдаемого использования процессора и добавление потоков для ее максимизации, выполняющих столько работы, сколько было возможно, чтобы процессор не простаивал. Доля использования процессора — полезный показатель, если приходится иметь дело с длительно выполняемыми или варьируемыми рабочими нагрузками. Однако этот подход оказался непригоден, так как критерии оценки данного показателя могли вводить в заблуждение. Например, в приложении, где происходит большой объем операций подкачки памяти. Наблюдаемое использование процессора может оказаться низким, и добавление большего количество потоков в такой ситуации привело бы к использованию большего объема памяти, что в свою очередь еще больше снизило бы нагрузку на процессор. Другая проблема с этим подходом заключается в том, что в случаях, где наблюдается интенсивная конкуренция, процессорное время на самом деле тратится на синхронизацию, а не на выполнение реальной работы, поэтому увеличение числа потоков просто ухудшило бы ситуацию.

Другой идеей было просто оставить за ОС все заботы об уровне параллелизма. По сути, именно это и делает пул потоков ввода-вывода, но пул рабочих потоков требовал более высокого уровня абстракции, чтобы обеспечить большие портируемость и эффективность управления ресурсами. Этот подход мог бы сработать в некоторых ситуациях, но программисту все равно нужно было бы знать, как осуществляется регулировка во избежание перенасыщения ресурсов (например, если созданы тысячи потоков, конкуренция за ресурсы может стать проблемой, а увеличение числа потоков только ухудшит ситуацию). Более того, это подразумевало, что программист должен был бы самостоятельно заботиться о параллельной обработке, что противоречит смыслу пула потоков.

Более новый подход включал концепцию пропускной способности, измеряемой в количестве завершенных рабочих элементов в единицу времени; этот показатель использовался для оптимизации производительности. В этом случае, когда нагрузка на процессор низка, потоки добавляются и одновременно проверяется, повысилась ли пропускная способность. Если да, добавляются еще потоки; не помогает — потоки удаляются. Такой подход более разумен, чем предыдущие, потому что он учитывает, сколько работы было выполнено, а не просто как используются ресурсы. К сожалению, на пропускную способность влияет много факторов — не только количество активных потоков (скажем, размер рабочих элементов), а это сильно затрудняет оптимизацию.

Теория управления для регулирования параллельной обработки

Чтобы преодолеть некоторые ограничения предыдущих реализаций, в CLR 4.0 были заложены новые идеи. Первая методология, учтенная нами, была взята из области теории управления — алгоритм Hill Climbing (HC). Это подход с автоматической оптимизацией на основе петли обратной связи между входом и выходом (input-output feedback loop). Выход в системе наблюдается и измеряется через малые промежутки, чтобы увидеть влияние управляемого входа; эта информация передается обратно в алгоритм для дальнейшей настройки входа. Рассматривая вход и выход как переменные, мы моделируем систему как функцию зависимости от этих переменных. Цель — последующая оптимизация измеряемого выхода.

В контексте системы пула рабочих потоков входом является количество потоков, одновременно выполняющих работу (или уровень параллельной обработки), а выходом — пропускная способность (рис. 1).

Рисунок 1 Петля обратной связи в ThreadPool

Мы наблюдаем и измеряем изменения пропускной способности во времени как результат добавления или удаления потоков, а затем решаем, добавлять или удалять дополнительные потоки, исходя из наблюдаемого уменьшения или повышения пропускной способности. Рис. 2 иллюстрирует идею.

Рис. 2. Общая производительность как функция уровня одновременности выполнения

Представляя пропускную способность как (полиномиальную) функция уровня параллельной обработки, алгоритм добавляет потоки, пока не будет достигнут максимум функции (около 20 в этом примере). С этой точки начнется уменьшение пропускной способности, и алгоритм удалит потоки. Через каждый интервал берется выборка измерений пропускной способности и «усредняется». Затем на ее основе принимается решение для следующего интервала . Понятно, что, если в измерениях много шума, статистическая информация не отражает реальную ситуацию, если только она не была получена за большой промежуток времени. Иначе трудно сказать, было ли улучшение результатом изменения уровня параллелизма или причиной является другой фактор, например флуктуации в рабочей нагрузке.

Адаптивные подходы в реальных системах затруднены; в нашем случае их применение особенно проблематично из-за сложности обнаружения малых вариаций или выборки изменений из очень зашумленной среды за короткий промежуток. Первая проблема с таким подходом заключается в том, что смоделированная функция (см. черную кривую на рис. 2) не является статической на практике (см. серые точки на том же графике), поэтому измерить небольшие изменения трудно. Следующая и, возможно, более серьезная проблема в том, что шум (вариации в измерениях, вызванные системной средой, например определенными операциями ОС, сбором мусора и др.) фактически не позволяет понять, есть ли реальная связь между входом и выходом, т. е. с уверенностью сказать, что пропускная способность является лишь функцией от числа потоков. По сути, в ThreadPool пропускная способность составляет лишь малую часть того, что наблюдается на выходе в действительности — большая его часть является шумом. Например, возьмем приложение, использующее множество потоков для обработки своей рабочей нагрузки. Добавление еще нескольких потоков окажется незаметным на выходе: если в каком-то интервале вы заметите улучшение, не факт, что он связан с изменением в уровне параллельной обработки (эту проблему иллюстрирует рис. 3).

Рисунок 3 Пример шума в ThreadPool

На рис. 3 по оси x отложено время, а по оси y нанесены результаты измерений пропускной способности и уровня параллелизма. Верхний график иллюстрирует, что при некоторых рабочих нагрузках, даже если число потоков (прямая линия) сохраняется постоянным, могут наблюдаться колебания пропускной способности (ломаная кривая). В этом примере флуктуации являются шумом. Нижний график — другой пример, где даже в присутствие шума наблюдается, в целом, увеличение пропускной способности со временем. Но число потоков по-прежнему остается постоянным, поэтому увеличение пропускной способности вызвано каким-то другим фактором, действующим в системе.

В следующем разделе я рассмотрю подход к фильтрации шума.

Введение фильтрации сигнала

Фильтрация сигнала используется во многих областях техники для уменьшения шума в сигналах; идея в том, чтобы найти шаблон входного сигнала в выходном. Эта теория применима и в контексте ThreadPool, если мы интерпретируем вход (уровень параллелизма) и выход (пропускную способность) алгоритма управления параллельной обработкой как сигналы. Если мы вводим намеренно модифицированный уровень параллелизма как «волновой» с известными периодичностью и амплитудой и ищем исходный волновой шаблон на выходе, то можем различить, что является шумом, по реальному влиянию входа на выход. Эту идею иллюстрирует рис. 4.

Рисунок 4 Определение факторов, влияющих на выход ThreadPool

Представьте на минутку, что система является черным ящиком, который генерирует вывод при подаче ввода. На рис. 4 показан упрощенный пример связи входа и выхода (первая половина), а во второй половине дан пример того, как вход и выход могли бы выглядеть при использовании фильтрации.

Вместо подачи линейного постоянного входа мы вводим сигнал, а затем пытаемся найти его в зашумленном выходе. Этот эффект может быть достигнут использованием таких способов, как полосный фильтр (band pass filter) или фильтр совпадений (match filter), обычно применяемых для отделения одних волновых сигналов от других или поиска очень специфических сигналов на выходе. Это также означает, что за счет внесения изменений на входе алгоритм принимает решения в каждый момент, основываясь на последней небольшой порции входных данных.

В частности, алгоритм в ThreadPool использует дискретное преобразование Фурье — методологию, позволяющую получить такую информацию, как величина и фаза волны. Затем на основе этой информации можно понять, как влиял вход на выход и влиял ли вообще. График на рис. 5 показывает пример поведения ThreadPool при использовании этой методологии на рабочей нагрузке, выполняемой более 600 секунд.

Рисунок 5 Измерения влияния входа на выход

В примере на рис. 5 известный шаблон входной волны (фаза, частота и амплитуда) можно отследить и на выходе. График иллюстрирует поведение алгоритма параллельной обработки, использующего фильтрацию выборки рабочей нагрузки. Тренд светлого оттенка соответствует входу, а темного — выходу. Количество потоков с течением времени варьируется, но это не значит, что мы создаем или уничтожаем потоки; мы сохраняем примерно одинаковое их число.

Хотя масштаб количества потоков отличается от такового для пропускной способности, прекрасно видно, что можно сопоставить то, как вход влияет на выход. Число потоков постоянно меняется минимум на единицу за отрезок времени, но это не значит, что был создан или уничтожен какой-то поток. Вместо этого потоки остаются «жить» в пуле — просто они не выполняют никакой работы.

В противоположность первоначальном подходу с применением алгоритма Hill Climbing (HC), где цель заключалась в моделировании графика пропускной способности и принятии решения на основе этих вычислений, более совершенная методология просто определяет, поможет ли изменение на входе что-то улучшить на выходе. На интуитивном уровне растет уверенность в том, что изменения, которые мы искусственно вводим, оказывают различимое влияние на выходе (максимальное число потоков, вводившихся в сигнал до сих пор, равно 20, что вполне разумно — особенно в случаях с большим количеством потоков). Один из недостатков этого подхода, использующего фильтрацию сигнала, — из-за ввода искусственного волнового шаблона оптимальный уровень параллелизма всегда будет отличаться минимум на один поток в ту или другую сторону. Кроме того, регулирование уровня параллелизма происходит сравнительно медленно (более быстрые алгоритмы основываются на показателях использования процессора), так как сначала нужно собрать объем данных, достаточный для стабилизации модели. И скорость этого процесса будет зависеть от размера рабочих элементов.

Этот подход не совершенен; для одних нагрузок работает лучше, для других — хуже, но он значительно эффективнее предыдущих методологий. Лучше всего наш алгоритм действует, когда рабочие элементы выполняются за относительно небольшое время.Дело в том, что чем меньше рабочий элемент, тем быстрее адаптируется алгоритм. Например, он превосходно функционирует, когда длительность выполнения индивидуальных рабочих элементов меньше 250 мс, но становится еще эффективнее, если это время сокращается до значений меньше 10 мс.

Управление параллельной обработкой: мы делаем это за вас

В заключение отмечу, что ThreadPool предоставляет сервисы, помогающие программисту сосредоточиться на своей работе, а не на управлении параллельной обработкой. Чтобы обеспечить такую функциональность, в реализацию ThreadPool включены алгоритмы высочайшего технического уровня, способные автоматически принимать решения за пользователя. Один из примеров — алгоритм управления параллельной обработкой, который развивался с учетом технологий и потребностей из разных сценариев применения.

Задача алгоритма управления параллельной обработкой в CLR 4.0 — автоматически решать, сколько рабочих элементов можно выполнять одновременно без потерь в эффективности, соответственно оптимизируя пропускную способность ThreadPool. Этот алгоритм сложен в тонкой настройке из-за шумовых факторов и таких параметров, как тип рабочей нагрузки; он также опирается на предположение, что каждый рабочий элемент является полезной нагрузкой. На его текущую структуру и поведение сильно повлияли сценарии применения в ASP.NET и Parallel Framework, для которых он обеспечивает хорошую производительность. В целом, ThreadPool эффективно выполняет свою работу. Однако программист должен понимать, что при некоторых рабочих нагрузках или при одновременной работе нескольких пулов рабочих потоков поведение этого алгоритма может стать непредсказуемым.

Эрика Фуентес (Erika Fuentes) — инженер-разработчик ПО и тестировщик в группе CLR, где она работает в группе Performance Team в основном в области Core Operating System. Автор нескольких академический публикаций по научным вычислениям, адаптивным системам и статистике.

Выражаю благодарность за рецензирование статьи Эрику Эйлебрехту (Eric Eilebrecht) и Мохамеду Абд Эль Азизу (Mohamed Abd El Aziz).

Threading — управление параллельными потоками

В этой части документации мы рассмотрим то новое в многопоточном API для использования с многоядерными процессорами, которое появилось в версии Framework 4.0:

• Parallel LINQ или PLINQ.
• Класс Parallel.
• Конструкции для параллельного выполнения задач (task parallelism).
• Коллекции одновременного выполнения (concurrent collections).
• SpinLock и SpinWait.

Это API также известно как PFX (Parallel Framework, аббревиатура ИМХО несколько ошибочная). Класс Parallel вместе с конструкциями параллелизма задач называется библиотекой параллельных задач, или TPL (Task Parallel Library).


Framework 4.0 также добавил несколько низкоуровневых конструкций для потоков, которые выполняют традиционные для многопоточности задачи. Мы их уже рассматривали ранее [5]:

• Конструкции сигнализации с малыми задержками (low-latency signaling constructs) SemaphoreSlim, ManualResetEventSlim, CountdownEvent и Barrier.
• Маркеры отмены (cancellation tokens) для кооперативной отмены действий (cooperative cancellation).
• Классы ленивой инициализации (lazy initialization classes).
• ThreadLocal .

Для комфортного понимания перед дальнейшим чтением желательно ознакомиться с частями 1..4 документации [2, 3, 4, 5] — в частности для изучения блокировок (locking) и безопасности совместной работы потоков (thread safety).

Все листинги кода секций параллельного программирования доступны как интерактивные примеры в LINQPad [6]. LINQPad это электронный блокнот, идеально подходящий для проверки блоков кода C# без необходимости создания окружающего класса, проекта или решения (имеется в виду среда Microsoft Visual Studio). Для получения доступа к примерам кликните на «Download More Samples» закладки » на закладке» в LINQPad, находящейся слева внизу, и выберите «C# 4.0 in a Nutshell: More Chapters».

[Почему PFX?]

В последнее время тактовые частоты CPU перестали расти, и производители переключились увеличение процессорных ядер. Это создало некоторую проблему для нас как для программистов, потому что стандартный однопоточный код не будет автоматически работать быстрее из-за увеличения числа ядер.

Задействование многих ядер проще реализовать в большинстве серверных приложений, где каждый поток независимо от других потоков работает над обработкой отдельного запроса пользователя, но сложнее то же самое сделать в приложении декстопа — потому что это обычно требует от нагруженного вычислениями кода реализации следующих функций:

1. Разбиение всей вычислительной задачи на малые части.
2. Запуск параллельной обработки этих частей с помощью многопоточности.
3. Собрать все результаты работы вместе, когда они станут доступны.

Последнее нужно сделать таким образом, чтобы не повредить совместной работе потоков ни с точки зрения безопасности и устойчивости (thread-safe), ни с точки производительности. Хотя Вы можете сделать все это классическими конструкциями многопоточности, это будет грубым подходом — в частности на шагах разделения общей задачи на части и последующей сборки результатов работы потоков. Другая проблема в том, что обычная стратегия блокировки для безопасности потоков вызывает большую конкуренцию потоков, когда много потоков работают над одними и теми же данными сразу.

Библиотеки PFX разработаны специально для того, чтобы помочь программировать в этих сценариях.

Программирование с целью задействования нескольких ядер или нескольких процессоров называется параллельным программированием. Это подмножество более широкой концепции многопоточности.

Концепции PFX. Существует 2 стратегии для разделения общей работы между потоками: параллелизм данных (data parallelism) и параллелизм задач (task parallelism).

Когда набор задач нужно выполнить на многих значениях данных, мы можем распараллелить всю задачу так, чтобы каждый поток выполнял (одинаковый) набор действий над подмножеством значений. Такой подход называется data parallelism, потому что мы распределяем обрабатываемые данные между потоками. В отличие от этого task parallelism подразумевает выполнение разными потоками разных задач; другими словами, каждый поток выполняет отличающий от других потоков алгоритм действий.

В общем случае data parallelism проще реализуется и лучше масштабируется для повышения количества ядер, потому что такой подход снижает или вовсе убирает работу потоков над общими данными (благодаря чему снижается конкуренция потоков и легче решать проблемы thread-safety). Также параллелизм данных усиливает тот факт, что чаще есть больше значений данных, чем каких-то дискретных задач, что увеличивает потенциал параллелизма.

Параллелизм данных также способствует структурированному параллелизму, что означает, что параллельные единицы общей работы запускаются и завершаются в одном и том же месте программы. В отличие от этого параллелизм задач имеет тенденцию быть не структурированным, это означает, что элементы параллельной работы могут начаться и завершиться в местах, рассеянных по программе. Структурированный параллелизм проще, меньше подвержен ошибкам, и позволяет Вам обрабатывать трудное задание разделения и координацию потоков (и даже финальное сопоставление результатов) из библиотек.

Компоненты PFX. PFX представляет два слоя функциональности. Верхний слой состоит из двух видов data parallelism API: библиотека PLINQ и класс Parallel. Нижний слой содержит классы параллелизма задач, плюс набор дополнительных конструкций, чтобы помочь с параллельным программированием.

PLINQ предоставляет самый богатый функционал: автоматизация всех шагов параллелизации, включая разделение работы на задачи, запуск выполнение этих задач в потоках и конечное сопоставление результатов в одну выходную последовательность. Она называется декларативной — потому что Вы просто декларируете, что хотите распараллелить свою работу (что структурируете как запрос LINQ), и позволяете платформе позаботиться о деталях реализации. В отличие от такого подхода другой вариант работы императивный, здесь Вам нужно специально написать код для разделения задачи на части и сопоставления результатов обработки. В случае использования класса Parallel Вы должны собрать результаты работы потоков самостоятельно; с конструкциями параллелизма задач Вы должны также самостоятельно разделить работу на части:

Решение для организации
параллельности
Разделение общей
работы на части
Сведение результатов
PLINQ ДА ДА
Класс Parallel ДА нет
Параллелизм задач PFX нет нет

Коллекции конкуренции (concurrent collections) и примитивы циклов опроса (spinning primitives) помогут Вам реализовать на низком уровне действия по параллельному программированию Это важно, потому что PFX была спроектирована для работы только на сегодняшней аппаратуре, но также может потребоваться работать с будущими моделями процессоров, где есть намного больше ядер. Если Вы хотите переместить кучу напиленной древесины, имея 32 рабочих, то самая большая трудность при перемещении древесины состоит в том, чтобы эти рабочие не мешали друг другу. То же самое с делением алгоритма между 32 ядрами: обычные блокировки используются для защиты доступа к общим ресурсам, результирующая блокировка может означать, что только часть этих ядер когда-либо будут на самом деле работать одновременно. Коллекции конкуренции специально настроены для высоко конкурентного доступа, с фокусом на минимизации или устранения блокировок. PLINQ и класс Parallel сами по себе полагаются на коллекции конкуренции и примитивы циклов опроса для эффективного управления работой.

Традиционная многопоточность это один из сценариев, когда многопоточность дает выигрыш даже на одноядерной машине, когда на самом деле нет настоящего параллелизма. Мы рассматривали все это ранее: задачи такого класса призваны сохранить отзывчивость интерфейса пользователя с одновременным выполнением других задач наподобие загрузки веб-страниц.

Некоторые конструкции, которые мы рассмотрим в секциях параллельного программирования, также иногда полезны и для традиционной многопоточности. В частности:

• PLINQ и класс Parallel полезны, когда Вы хотите выполнять операции параллельно и затем ждать их завершения (структурированный параллелизм). Это включает незначительно нагружающие CPU задачи, такие как вызов веб-службы.
• Конструкции параллелизма задач полезны, когда Вы хотите запустить некую операцию в потоке пула, и также для управления работой задачи через продолжения выполнения (continuations) и родительские/дочерние задачи.
• Коллекции конкурентного выполнения иногда применимы, когда Вы хотите организовать потокобезопасную очередь, стек или словарь.
• BlockingCollection предоставляет простой способ реализовать структуры генератор/потребитель данных (producer/consumer).

Когда следует использовать PFX. Изначально PFX предназначен для параллельного программирования: задействование ядер многоядерных процессоров для ускорения работы кода с интенсивными вычислениями.

Сложность в применении многих ядер показывает закон Амдала, который устанавливает, что максимальное улучшение производительности ограничивается порцией кода, которая обязательно должна выполняться последовательно. Например, если две трети времени алгоритма можно распараллелить, Вы не сможете превысить определенный порог производительности, даже если будете иметь в наличии бесконечное количество ядер [7].

Таким образом сначала нужно оценить узкое место, ограничивающее производительность кода. Также стоит определиться, насколько нужны в коде конкретные интенсивные вычисления — часто алгоритмическая оптимизация дает самый большой выигрыш в производительности. Однако недостаток такого подхода в том, что некоторые техники оптимизации усложняют реализацию параллелизма кода.

Самый большой выигрыш дает правильная организация пожалуй самого сложного момента параллелизма — где общая работа может быть просто разделена на части, которые можно эффективно выполнить каким-то алгоритмом (структурированный параллелизм лучше всего подходит для таких проблем). Примерами могут служить задачи по обработке изображений, ray tracing, атаки по подбору в прикладной математике или криптографии. Другой пример проблемы параллельности — реализация оптимизированной версии алгоритма быстрой сортировки — достижение хорошего результата не тривиально, и может потребовать применения не структурированного параллелизма.

[PLINQ]

PLINQ автоматически распараллеливает локальные запросы (LINQ queries). Достоинство PLINQ в том, что её легко использовать — платформа берет на себя заботу разделения общей работы и конечного сведения вместе результатов.

Чтобы использовать PLINQ просто вызовите AsParallel() на входной последовательности, и затем продолжите запрос LINQ, как обычно. Следующий запрос вычисляет простые числа между 3 и 100000 — при полном использовании всех ядер целевой машины:

AsParallel это метод расширения в System.Linq.ParallelEnumerable. Он оборачивает ввод в последовательность на базе ParallelQuery , который запускает операторы запроса LINQ, которые Вы впоследствии вызываете, чтобы связать с альтернативным набором методов расширения, определенных в ParallelEnumerable. Это дает параллельные реализации каждого из стандартных операторов запроса. В сущности это работает по принципу разделения входной последовательности на куски, выполняемые в разных потоках, которые соединяются вместе обратно в одну выходную потребляемую последовательность.

На картинке ниже показан прицип распараллеливания выражения:

Вызов AsSequential() разворачивает последовательность ParallelQuery так, чтобы последующие операторы запроса связали связались со стандартными операторами запроса и выполнились последовательно. Это нужно перед вызовом методов, которые имеют побочные эффекты или не безопасны в условиях многопоточности.

Для операторов запроса, которые принимают две входные последовательности (Join, GroupJoin, Concat, Union, Intersect, Except и Zip), Вы должны применить AsParallel() к обоим входным последовательностям (иначе будет выброшено исключение). Однако Вам не нужно продолжать применять AsParallel к запросу во время его выполнения, потому что операторы запроса PLINQ выводят другую последовательность ParallelQuery. Фактически повторный вызов AsParallel будет не эффективным, потому что вызовет слияние и повторное разделение запроса:

Не все операторы запроса можно эффективно распараллелить. Для тех, которые распараллелить нельзя, PLINQ вместо этого реализует последовательный оператор. PLINQ может также работать последовательно, если подозревает, что издержки параллелизации на самом деле замедлят определенный запрос.

PLINQ подходит только для локальных коллекций: он не работает с LINQ к SQL или Entity Framework, потому что в этих случаях LINQ транслируется в SQL, который затем выполняется на сервере баз данных. Однако Вы можете использовать PLINQ для выполнения дополнительных локальных запросов на результатах, полученных из запросов к базе данных.

Если запрос PLINQ выбрасывает исключение, то оно перебрасывается как AggregateException, у которого свойство InnerExceptions содержит реальное исключение (или исключения). Подробнее см. «Работа с AggregateException».

Так как AsParallel прозрачно распараллеливает запросы LINQ, возникает вопрос: почему Microsoft не делает по умолчанию простую параллелизацию стандартных операторов запроса, т. е. не применяет PLINQ по умолчанию?

Тут несколько причин. Во-первых, для того, чтобы применение PLINQ было полезным, должно присутствовать разумное количество интенсивной вычислительной работы, чтобы передать её на обработку рабочим потокам. Большинство запросов LINQ к объектам выполняются очень быстро, и к ним не только не нужно применять параллелизм, но это также будет вовлекать дополнительные ненужные вычислительные расходы на разделение задачи, сбор результатов вместе, координацию работы потоков, в результате все только замедлится.

• Выход запроса PLINQ query (по умолчанию) может отличаться от запроса LINQ в контексте упорядочивания элементов.
• PLINQ оборачивает исключения в AggregateException (чтобы обработать ситуации выбрасывания множества исключений).
• PLINQ будет давать ненадежные результаты, если запросы вовлекают себя не безопасные для выполнения в потоках методы.

И наконец, PLINQ предлагает довольно много вариантов настройки. Обременение стандартных запросов LINQ к Objects API такими нюансами дополнительно отвлекало бы от выполнения общей задачи.

Баллистика параллельного выполнения. Как и обычные запросы LINQ, запросы PLINQ вычисляются «ленивым» способом. Это означает, что выполнение срабатывает только когда Вы начинаете потреблять результаты запроса — обычно в цикле foreach (хотя это может быть сделано и через оператор преобразования, такой как ToArray, или оператор, который возвращает один элемент или значение).

Однако по мере потребления результатов выполнение осуществляется несколько иначе по сравнению с обычными последовательными запросами. Последовательный запрос выполняется полностью потребителем в стиле «выталкивания» (pull): каждый элемент из входной последовательности выбирается точно тогда, когда этого требует потребитель. Параллельный запрос обычно использует независимые потоки для выборки элементов из входной последовательности немного перед тем, как это понадобилось потребителю (нечто похожее на телевизионный суфлер для дикторов или антишоковый буфер данных в CD-плеерах). Затем он обрабатывает элементы в параллельном запросе через цепочку запроса, сохраняя результаты в маленьком буфере, откуда они могут быть взяты по запросу потребителя. Если потребитель приостановил потребление данных или сразу прекратил их обработку, то процессор запроса также ставит на паузу или останавливает работу, чтобы зря не расходовать время CPU или память.

Вы можете подстроить буферизацию PLINQ вызовом WithMergeOptions после AsParallel. Значение по умолчанию AutoBuffered обычно дает самые лучшие результаты. NotBuffered запрещает буферизацию, что полезно, если Вы хотите увидеть результаты настолько быстро, насколько это возможно; FullyBuffered полностью кэширует весь набор результата перед тем, как он будет доступен потребителю (операторы OrderBy и Reverse в сущности работают по такому методу, как как и операторы элемента, агрегации и преобразования).

PLINQ и порядок следования. Побочный эффект от параллелизации операторов запроса состоит в том, что когда результаты соединяются вместе, то не обязательно они будут в том же порядке, в каком были на входе, что показано в предыдущей диаграмме. Другими словами, больше не соблюдается нормальное сохранение порядка следования данных запросов LINQ.

Если Вам необходимо сохранить порядок следования, то это можно принудительно активировать вызовом AsOrdered() после AsParallel():

Вызов AsOrdered ухудшает производительность с большим количеством элементов, потому что PLINQ должна отслеживать оригинальную позицию каждого элемента.

Вы можете отменить в запросе действие AsOrdered позже вызовом AsUnordered: это вводит «точку случайной перетасовки» что позволяет запросу с этого момента выполняться более эффективно. Если Вам нужно сохранить входную последовательность только для двух первых операторов запроса, то сделайте так:

Поведение AsOrdered не активировано по умолчанию, потому что для большинства запросов оригинальный порядок входных данных не имеет значения. Другими словами, если бы AsOrdered действовало по умолчанию, то Вы применяли бы AsUnordered для большинства своих параллельных запросов для получения самой лучшей производительности (иначе зачем нам параллелизм), что было бы обременительно.

Ограничения PLINQ. Есть несколько практических ограничений на то, что PLINQ мог бы распараллелить. Эти ограничения могут впоследствии быть устранены последующими сервис-паками и более новыми версиями Framework.

Следующие операторы запроса не поддаются параллелизации, если их исходные элементы не находятся в своей исходной позиции индексации:

• Take, TakeWhile, Skip и SkipWhile.

• Индексированные версии Select, SelectMany и ElementAt.

Большинство операторов запроса меняют позицию индексации элементов (включая те, которые удаляют элементы, такие как Where). Это означает, что если Вы хотите использовать такие предшествующие операторы, то они должны быть перед запросом.

Следующие операторы запроса могут быть распараллелены, но используют затратную стратегию разделения, которая иногда может работать медленнее, чем последовательная обработка:

• Join, GroupBy, GroupJoin, Distinct, Union, Intersect и Except.

Отобранные перегрузки оператора Aggregate в своих стандартных инкарнациях не параллелизуются — PLINQ для них предоставляет специальные перегрузки.

Все другие операторы параллелизуются, хотя их использование не дает гарантии, что Ваш запрос будет распараллелен. PLINQ может запустить Ваш запрос последовательно, если заподозрит, что дополнительные затраты на параллелизацию дадут эффект замедления определенного запроса. Вы можете отменить такое поведение и принудительно заставить применять параллельную обработку путем вызова следующего оператора после AsParallel():

Предположим, что нам нужно написать спеллчекер, который быстро обрабатывает большие документы, напрягая все доступные ядра. Путем формулирования нашего алгоритма в виде запроса LINQ мы могли бы очень просто распараллелить его.

Первый шаг состоит в загрузке словаря английских слов в HashSet для эффективного обращения к нему:

ОК, теперь создадим из нашей словарной таблицы тестовый «документ», представляющий массив из миллиона случайных слов. После сборки этого массива введем в него несколько орфографических ошибок:

Теперь мы можем выполнить нашу параллельную проверку орфографии путем тестирования wordsToTest на соответствие словарю wordLookup. PLINQ делает это очень просто:

Вот результат, который покажет LINQPad:

OrderedParallelQuery (2 элемента)
Слово Индекс
woozsh 12345
wubsie 23456

IndexedWord это пользовательская структура, которую мы определили следующим образом:

Метод wordLookup.Contains в предикате дает запросу некую «суть» и делает его достойным параллелизации.

Мы могли бы немного упростить запрос с помощью анонимного типа вместо структуры IndexedWord. Однако это снизило бы производительность, потому что анонимные типы (которые являются классами, и поэтому ссылочными типами) привели бы к выделению памяти в куче и последующий за этим сбор мусора.

Разница в производительности между кэшем и стеком могла бы быть незначительной с последовательными запросами, но с параллельными запросами стековое выделение памяти может быть довольно выгоднее. Причина в том, что стековое выделение памяти очень хорошо параллелизуется (у каждого потока свой собственный стек), в то же время все потоки должны конкурировать друг с другом на куче, что управляется одним менеджером памяти и сборщиком мусора.

Использование ThreadLocal . Давайте расширим наш пример параллелизацией создания самого массива тестовых слов. Структурируем его алгоритм в запрос LINQ, что должно быть просто. Вот так выглядит последовательная версия:

К сожалению, вызов random.Next не потокобезопасен, так что не так просто вставить AsParallel() в этот запрос. Потенциальное решение проблемы — написать функцию, которая будет делать блокировку вокруг вызван random.Next; однако это снизило бы эффективность параллельности. Лучше всего использовать ThreadLocal , чтобы создать отдельный объект Random для каждого потока. Тогда мы сможем распараллелить запрос следующим образом:

Наша дополнительная функция нужна для инстанциации объекта Random. В неё мы передаем хеш-код Guid, чтобы гарантировать, что если два объекта Random будут созданы в короткий промежуток времени, то они приведут к различным последовательностям случайного числа.

Заманчиво было бы перебрать Ваши готовые приложения в поиске запросов LINQ, и поэкспериментировать с ними на предмет параллелизации. Это обычно не будет продуктивным, потому что большинство проблем, для решение которых LINQ лучше всего подходит, выполняются очень быстро, и параллелизация не даст выгоды. Лучше всего найти те места, где интенсивные вычисления CPU составляют узкое место в программе, после чего задать себе вопрос: «можно ли как-то представить эти вычисления в виде запроса LINQ?» (добро пожаловать в побочные эффекты реструктуризации кода, когда LINQ обычно уменьшает код и делает его более читаемым).

PLINQ также хорошо подходит для случаев, когда есть сложности с ручным распараллеливанием вычислений. Это также подходит для структурированных блокирующихся задач, таких как вызов нескольких веб-сервисов сразу (см. «Вызов блокирующих и интенсивных по вводу/выводу функций»).

PLINQ может быть плохим выбором для обработки изображений, потому что сборка вместе миллионов точек в выходную последовательность станет узким местом программы. Вместо этого лучше всего записывать точки прямо массив не обслуживаемого блока памяти, и использовать класс Parallel или параллелизм задач для управления многопоточностью (однако есть возможность победить конечный сбор результатов обработки использованием ForAll. Так делать стоит, если алгоритм обработки изображения естественно укладывается в LINQ).

Функциональная чистота. Поскольку PLINQ запускает Ваш запрос в параллельных потоках, Вы должны остерегаться выполнять не безопасные для потоков операции. В частности, запись в переменные даст побочные эффекты, и поэтому не будет потокобезопасной:


Мы могли бы сделать инкремент i потокобезопасным путем ввода блокировок или использования класса Interlocked [5], но проблема все еще останется, поскольку i не обязательно будет соответствовать позиции входного элемента. И добавление AsOrdered в запрос не решило бы эту проблему, потому что AsOrdered гарантирует только то, что элементы появятся на выходе в том же порядке, в каком они обрабатывались бы последовательно — но это не даст их действительную последовательную обработку.

Вместо этого запрос нужно переписать с использованием индексированной версии Select:

Для лучшей производительности любые методы, вызываемые из операторов запроса, должны быть потокобезопасными, для чего они не должны записывать что-то в поля или свойства (должны быть без побочных эффектов, или функционально чистыми). Если потокобезопасность достигается блокировкой, то параллелизм запроса потенциально будет ограничен — на коэффициент, равный длительности блокировки, поделенной на общее время, потраченное в этой функции.

Вызов блокирующих и интенсивных по вводу/выводу функций. Иногда запрос выполняется долго не из-за загрузки CPU, а из-за ожидания чего-нибудь — такого как ожидание завершения загрузки веб-страницы или ответа аппаратуры. PLINQ может эффективно распараллелить такие запросы, если Вы дадите ему подсказку, вызвав WithDegreeOfParallelism после AsParallel. Например предположим, что мы хотим выполнить пинг шести web-сайтов одновременно. Вместо того, чтобы использовать неуклюжих асинхронных делегатов или вручную запускать 6 потоков, это можно эффективно выполнить запросом PLINQ:

WithDegreeOfParallelism принуждает PLINQ запустить указанное количество потоков (в данном примере 6) одновременно. Это необходимо, когда вызываются такие блокирующие выполнение потока функции, как Ping.Send, потому что PLINQ иначе предполагает, что запрос интенсивно загружает CPU, и выделяет задачи соответствующим образом. Например, на двухядерной машине PLINQ может по умолчанию запустить одновременно только 2 задачи, что в данной ситуации очень нежелательно.

PLINQ обычно обслуживает каждую задачу в потоке согласно выделению в пуле потоков. Вы можете ускорить начальный запуск потоков вызовом ThreadPool.SetMinThreads.

Для другого примера предположим, что мы пишем систему наблюдения, и хотим с повторами комбинировать изображения из 4 камер безопасности в одно общее изображение для отображения на мониторе наблюдения. Представим камеру следующим классом:

Чтобы получить составную картинку, мы должны вызвать GetNextFrame на каждом объекте камеры. Если заранее знать, что эта операция привязана к интенсивному вводу/выводу, то можно ускорить в 4 раза скорость смены кадров на мониторе применением параллелизации — даже на одноядерной машине. PLINQ делает это возможным с минимальными усилиями:

GetNextFrame является блокирующимся методом, поэтому мы используем WithDegreeOfParallelism, чтобы достичь нужной параллельности. В нашем примере блокировка происходит, когда мы вызываем Sleep; в реальной жизни блокировка будет происходить из-за получения изображения из камеры, потому что поток ввода/вывода данных с камеры требует просто ожидания данных и не вовлекает интенсивное использование CPU.

Вызов AsOrdered гарантирует, что картинки будут отображается в определенном порядке. Поскольку здесь только 4 элемента в последовательности, это даст незначительный эффект на снижение производительности.

Изменение степени параллелизма. Вы можете только один раз вызвать WithDegreeOfParallelism в запросе PLINQ. Если нужно вызвать его снова, то следует принудительно слить данные запроса и заново его разделить повторным вызовом AsParallel() в запросе:

Отмена запроса (Cancellation). Отмена запроса PLINQ, когда Вы потребляете его результаты в цикле foreach, осуществляется просто: обычный break в цикле foreach завершит прокрутку цикла, и запрос будет автоматически отменен поскольку перечислитель был неявно уничтожен.

Для запроса, который завершается на оператор преобразования, элемента или агрегации, его можно отменить из другого потока через маркер отмены (cancellation token). Чтобы вставить маркер (token), вызовите WithCancellation после вызова AsParallel, передав в свойстве Token объект CancellationTokenSource. После этого другой объект может вызвать Cancel на источнике маркера, что выбросит исключение OperationCanceledException в потребителе запроса:

PLINQ не делает вытесняющий обрыв работы потока, потому что это опасно (см. «Прекращение работы потока» [5]). Вместо этого при отмене происходит ожидание завершения обработки текущего элемента у каждого рабочего потока перед завершением запроса. Это означает, что любые внешние методы, которые вызывает запрос, отработают до своего завершения.

Оптимизация по выходу. Одно из достоинств PLINQ — удобно соединяются результаты распараллеленной работы в одну выходную последовательность. Хотя иногда все, что требуется — запустить некую функцию по одному разу на каждый элемент:

Если имеет место этот случай, и не имеет значения, в каком порядке будут обработаны элементы, то можно улучшить эффективность PLINQ методом ForAll.

Метод ForAll запускает делегата на каждый выходной элемент ParallelQuery. Это происходит напрямую в PLINQ, пропуская шаги слияния и перечисления результатов. Простейший пример:

Сбор вместе и перечисление результатов не массивно дорогая операция, так что оптимизация ForAll уступает большим выгодам, когда большое количество быстро обрабатываемых входных элементов.

Оптимизация по входу. У PLINQ есть 3 стратегии разделения задачи для передачи входных элементов потокам:

Стратегия Выделение элемента Относительное быстродействие
Chunk partitioning Динамическое Среднее
Range partitioning Статическое Плохое или отличное
Hash partitioning Статическое Плохое

Для операторов запроса, которые требуют сравнения элементов (GroupBy, Join, GroupJoin, Intersect, Except, Union и Distinct), у Вас нет выбора: PLINQ всегда использует hash-разделение. Hash-разделение относительно не эффективное, потому что должно предварительно рассчитать хеш-код для каждого элемента (так что элементы с одинаковыми хеш-кодами можно было обработать в одном потоке). Если Вы решили, что это слишком медленно, то можете только вызвать AsSequential для запрета параллелизма.

Для всех других операторов запроса у Вас есть выбор — использовать разделение по диапазону (range) или по куску (chunk). По умолчанию:

• Если входная последовательность может быть проиндексирована (если это массив, или реализуется IList ), то PLINQ использует range-разделение.
• Иначе PLINQ выберет chunk-разделение.

В сущности range-разделение быстрее на длинных последовательностях, где обработка каждого элемента занимает примерно одинаковое время CPU. Иначе chunk-разделение обычно работает быстрее.

Чтобы принудительно включить range-разделение:

• Если запрос начинается на Enumerable.Range, замените его на ParallelEnumerable.Range.
• Иначе просто вызовите ToList или ToArray на входной последовательности (очевидно это снизит общую производительность, что следует учитывать).

ParallelEnumerable.Range это не просто ярлычок для вызова Enumerable.Range(. ).AsParallel(). Это меняет производительность запроса путем активации range-разделения.

Чтобы принудительно включить chunk-разделение, оберните входную последовательность в вызов Partitioner.Create (находится в System.Collection.Concurrent) следующим образом:

Второй аргумент Partitioner.Create показывает, что Вы хотите сбалансировать запрос по нагрузке, что является другим способом указать, что нужно применить chunk-разделение.

Chunk-разделение работает таким образом, что каждый рабочий поток для обработки периодически берет малые порции элементов (chunks) из входной последовательности. PLINQ начинает с выделения очень маленьких кусков (из одного или двух элементов за один раз), затем увеличивает размер куска по мере выполнения запроса: это гарантирует, что малые последовательности будут эффективно распараллелены, и большие последовательности не приведут к чрезмерным расщеплениям входных данных. Если произойдет так, что рабочий поток получает «простые» элементы (которые обработаются быстро), то это закончится тем, что он получит больше кусков. Такая система удерживает каждый поток одинаково нагруженным (ядра получаются «сбалансированными» по нагрузке); недостаток в том, что выборка элементов из общей входной последовательности требует синхронизации (обычно исключительной блокировки), что может привести к некоторым дополнительным тратам по вычислениям и конкуренции потоков.

Range-разделение пропускает обычное перечисление по входу, и предварительно выделяет одинаковое количество элементов для каждого потока, избегая состязания на входной последовательности. Но если произойдет так, что некоторые потоки получат простые элементы, и выполнят свою обработку быстрее, то они останутся в ожидании завершения обработки в остальных потоках. Наш предыдущий пример с числовыми вычислениями может показать плохую эффективность при range-разделении. Примером, когда range-разделение работает хорошо, будет вычисление суммы квадратных корней первых 10 миллионов целых чисел:

ParallelEnumerable.Range вернет ParallelQuery , так что Вам не нужно впоследствии вызывать AsParallel.

Range-разделение не обязательно будет выделять диапазоны элементов в непрерывных блоках — вместо этого может быть выбрана стратегия чередования (striping). Например, если есть два рабочих потока, то один может обрабатывать элементы с четными номерами, в то время как другой процесс с нечетными. Оператор TakeWhile почти наверняка инициирует striping-стратегию, чтобы избежать нежелательной обработки элементов в последовательности.

Параллелизация пользовательских агрегаций. PLINQ эффективно распараллеливает операторы Sum, Average, Min и Max без дополнительного вмешательства. Однако оператор Aggregate представляет для PLINQ дополнительные сложности.

Если Вы не знакомы с этим оператором, то можете думать об Aggregate как про обобщенную версию Sum, Average, Min и Max — другими словами, это оператор, который позволят Вам подключить пользовательский алгоритм накопления для реализации необычных агрегаций. Следующий код демонстрирует, как Aggregate может выполнить работу Sum:

Первый аргумент для Aggregate это начальное значение (seed), с которого начинается накопление. Второй аргумент это выражение для обновления накапливаемого значения с учетом нового элемента. Опционально Вы можете предоставить третий аргумент для проектирования конечного результата из накопленного значения.

Большинство проблем, для которых был разработан Aggregate, можно решить проще с циклом foreach, с более знакомым синтаксисом. Достоинство Aggregate в том, что большие и сложные агрегации можно декларативно распараллелить с помощью PLINQ.

Агрегации без начальной точки отсчета (unseeded). Вы может опустить значение seed при вызове Aggregate, в этом случае первый элемент станет неявным seed, и агрегация начнется со второго элемента. Вот предыдущий пример, переделанный на unseeded:

Это даст тот же результат, что и ранее, но реально мы делаем другое вычисление. Раньше мы выполнили вычисление 0+1+2+3; теперь же 1+2+3. Мы можем лучше проиллюстрировать разницу применением умножения вместо сложения:

Как мы скоро увидим, unseeded-агрегации имеют преимущество для параллелизации, так как не требуют использования специальных перегрузок. Но с unseeded-агрегациями можно попасть в ловушку: их методы предназначены для использования с делегатами, которые являются коммутативными и ассоциативными. При другом использовании любой результат будет не интуитивным (с обычными запросами) или не детерминированным (в случае, когда Вы распараллелили запрос с помощью PLINQ). Для примера рассмотрим следующую функцию:

Она ни коммутативна, ни ассоциативна (т. е. 1+2*2 != 2+1*1). Посмотрим, что получится, когда мы используем её для сложения квадратов чисел 2, 3 и 4:

Вместо того, чтобы получить 29 как результат 2*2 + 3*3 + 4*4, получится 27 в результате вычисления 2 + 3*3 + 4*4. Мы можем исправить это несколькими способами. Первый способ — добавить 0 в качестве первого элемента:

Однако это не только не элегантно, но все еще даст некорректные результаты при параллелизации, потому что PLINQ усиливает подразумеваемую ассоциативность функции, выбирая несколько элементов как seed-значения. Для иллюстрации мы обозначим нашу функцию агрегации следующим образом:

тогда запрос LINQ к объектам вычислил бы f(f(f(0, 2),3),4), в то время как PLINQ мог бы сделать f(f(0,2),f(3,4)) со следующим результатом:

Первая часть распараллеливания: a = 0 + 2*2 (= 4)
Вторая часть распараллеливания: b = 3 + 4*4 (= 19)
Конечный результат: a + b*b (= 365)
ИЛИ ДАЖЕ: b + a*a (= 35)

Есть 2 хороших решения. Первое — превратить это в seeded-агрегацию с нулем для seed. Единственная сложность в том, что с PLINQ нам нужно использовать специальную перегрузку, чтобы запрос не выполнятся последовательно (скоро мы это увидим).

Второе решение реструктурировать запрос так, чтобы функция агрегации стала коммутативной и ассоциативной:

Конечно, в таком простом сценарии Вы можете (и должны) использовать оператор Sum вместо Aggregate:

Вы можете на самом деле пойти довольно далеко просто с Sum и Average. Например, можно использовать Average для вычисления среднего квадратического значения (root-mean-square, RMS):

и даже стандартную девиацию:

Оба варианта безопасны, эффективным и полностью параллелизуются.

Параллелизация Aggregate. Мы только что говорили, что для unseeded-агрегаций предоставленный делегат должен быть ассоциативным и коммутативным. PLINQ даст некорректные результаты, если это правило нарушается, потому что это вовлечет несколько seed-значений из входной последовательности, чтобы одновременно агрегировать несколько частей последовательности.

Явные seed-агрегации могли бы выглядеть как безопасная опция с PLINQ, но к сожалению обычно они выполняются последовательно, потому что полагаются на одно значение seed. Чтобы смягчить это, PLINQ предоставляет другую перегрузку Aggregate, которая позволяет указать несколько значений seed — или скорее разработанную функцию для seed. Для каждого потока эта функция выполнится, чтобы генерировать отдельное значение seed, которое становится локальным аккумулятором для потока, в который будет выполняться агрегация элементов.

Мы также должны предоставить функцию для индикации, как комбинировать локальный и главный аккумуляторы. И наконец, эта перегрузка Aggregate (немного необоснованно) ожидает делегата для выполнения любого конечного преобразования для результата (того же самого результата Вы можете достичь, самостоятельно вызвав какую-нибудь функцию преобразования позже). Итак, имеется 4 делегата, передаваемых в таком порядке:

seedFactory. Вернет новый локальный аккумулятор.
updateAccumulatorFunc. Агрегирует элемент в локальный аккумулятор.
combineAccumulatorFunc. Комбинирует локальный аккумулятор с главным аккумулятором.
resultSelector. Применяет любое последнее преобразования к конечному результату.

В простых сценариях Вы можете указать значение seed вместо функции seed (вместо seedFactory). Эта тактика не сработает, когда seed является ссылочным типом, который Вы хотите мутировать, потому что один и тот же экземпляр тогда будет использоваться каждым потоком.

В качестве очень простого примера рассмотрим следующие суммы значений в массиве чисел:

В этом примере мы могли бы получить тот же ответ, используя более простые способы (такие как агрегация unseeded, или что еще лучше, оператор Sum). Чтобы дать более реалистичный пример, предположим, что мы хотим вычислить частоту появления каждой буквы английского алфавита в указанной строке. Простое последовательное решение может выглядеть вот так:

Пример, когда строка будет очень большой — вычисление генома. Тогда «алфавит» должен состоять из букв a, c, g и t.

Чтобы распараллелить это, мы должны заменить оператор foreach на вызов Parallel.ForEach (что мы рассмотрим в следующей секции), но тут мы столкнемся с проблемой конкурентной обработки общего массива. И блокировка вокруг доступа к этому массив дала бы безопасность для потоков, но убила бы весь потенциал параллелизации.

Агрегат предоставляет опрятное решение. В этом случае в аккумулятор это массив, такой же как letterFrequencies из предыдущего примера. Вот последовательная версия, использующая Aggregate:

И теперь параллельная версия, использующая специальную перегрузку PLINQ:

Обратите внимание, что функция локального накопления мутирует массив localFrequencies. Это важная и законная возможность для выполнения оптимизации, потому что localFrequencies локален для каждого потока.

[Класс Parallel]

PFX предоставляет базовую форму структурированного параллелизма через три статические метода в классе Parallel:

Parallel.Invoke . Запускает параллельно делегатов из массива.
Parallel.For . Выполняет параллельный эквивалент цикла for языка C#.
Parallel.ForEach . Выполняет параллельный эквивалент цикла foreach языка C#.

Все три метода блокируют выполнение до момента завершения работы. Как с PLINQ, после не обработанного исключения оставшиеся рабочие потоки останавливаются после своей текущей итерации и исключение (или исключения) выбрасываются обратно в вызывающий код — обернутое в AggregateException.

Parallel.Invoke . Parallel.Invoke параллельно запускает в массиве делегатов Action, и затем ждет их завершения. Самая простая версия метода определена следующим образом:

Вот так мы можем использовать Parallel.Invoke, чтобы одновременно загрузить две веб-странички:

Внешне это выглядит как удобный ярлычок для создания и ожидания двух объектов Task (или асинхронных делегатов). Но здесь есть важное отличие: Parallel.Invoke все еще работает эффективно, если Вы передадите в массиве миллион делегатов. Это потому, что Parallel.Invoke делит большое количество элементов на пакеты, которые присваивает нижележащим задачам Task — вместо того, чтобы создавать отдельную Task для каждого делегата.

Как и со всеми методами Parallel, Вы действуете самостоятельно при сведении вместе результатов обработки. Это означает, что Вам нужно учитывать сохранение безопасности потоков. Следующий код, к примеру, не потокобезопасный:

Блокировка вокруг этого списка List исправит проблему, хотя блокировка создаст узкое место, если у Вас очень большой массив быстро исполняемых делегатов. Есть решение лучше — использовать thread-safe collection, такую как ConcurrentBag, что идеально для этого случая.

Parallel.Invoke также перегружается для того, чтобы принять объект ParallelOptions:

Вместе с ParallelOptions Вы можете вставить маркер отмены (cancellation token), ограничить максимальную конкуренцию и указать пользовательский планировщик задач (custom task scheduler). Маркер отмены уместен, когда Вы выполняете (грубо) больше задач, чем имеется в наличии ядер: при отмене любые не запущенные делегаты будут заброшены. Однако любые уже выполняющиеся делегаты будут продолжать свое выполнение до завершения. См. «Отмена запроса (Cancellation)» в [4] для примера использования маркеров отмены.

Мастер Йода рекомендует:  Как вставить картинку в HTML
Добавить комментарий