оператор lock — обеспечение монопольного доступа к общему ресурсу
Оператор lock получает взаимоисключающую блокировку заданного объекта перед выполнением определенных операторов, а затем снимает блокировку. Во время блокировки поток, удерживающий блокировку, может снова поставить и снять блокировку. Любой другой поток не может получить блокировку и ожидает ее снятия. Оператор lock гарантирует, что в любой момент времени только один поток выполняет свой текст.
Оператор lock имеет форму
lock (x) < // Your code. >
Здесь x — это выражение ссылочного типа. Оно является точным эквивалентом
object __lockObj = x; bool __lockWasTaken = false; try < System.Threading.Monitor.Enter(__lockObj, ref __lockWasTaken); // Your code. >finally
Так как в коде используется try-finally оператор , блокировка освобождается, даже если в тексте lock инструкции возникает исключение.
Выражение нельзя использовать await в теле lock оператора.
Рекомендации
При синхронизации доступа потоков к общему ресурсу блокируйте выделенный экземпляр объекта (например, private readonly object balanceLock = new object(); ) или другой экземпляр, который, скорее всего, не будет использоваться как объект блокировки другими частями кода. Не используйте один и тот же экземпляр объекта блокировки для разных общих ресурсов: это может привести к взаимоблокировке или состязанию при блокировке. В частности, не используйте следующие экземпляры в качестве объектов блокировки:
- this , так как он может использоваться вызывающими объектами как блокировка;
- Type экземпляры, так как они могут быть получены оператором typeof или отражением.
- экземпляры строк, включая строковые литералы, так как они могут быть интернированы.
Удерживайте блокировку в течение максимально короткого времени, чтобы сократить число конфликтов при блокировке.
Пример
В следующем примере определяется класс Account , который синхронизирует доступ к закрытому полю balance путем блокировки выделенного экземпляра balanceLock . Использование одного и того же экземпляра для блокировки гарантирует, что balance поле не может быть обновлено одновременно двумя потоками, пытающимися вызвать Debit методы или Credit одновременно.
using System; using System.Threading.Tasks; public class Account < private readonly object balanceLock = new object(); private decimal balance; public Account(decimal initialBalance) =>balance = initialBalance; public decimal Debit(decimal amount) < if (amount < 0) < throw new ArgumentOutOfRangeException(nameof(amount), "The debit amount cannot be negative."); >decimal appliedAmount = 0; lock (balanceLock) < if (balance >= amount) < balance -= amount; appliedAmount = amount; >> return appliedAmount; > public void Credit(decimal amount) < if (amount < 0) < throw new ArgumentOutOfRangeException(nameof(amount), "The credit amount cannot be negative."); >lock (balanceLock) < balance += amount; >> public decimal GetBalance() < lock (balanceLock) < return balance; >> > class AccountTest < static async Task Main() < var account = new Account(1000); var tasks = new Task[100]; for (int i = 0; i < tasks.Length; i++) < tasks[i] = Task.Run(() =>Update(account)); > await Task.WhenAll(tasks); Console.WriteLine($"Account's balance is "); // Output: // Account's balance is 2000 > static void Update(Account account) < decimal[] amounts = [0, 2, -3, 6, -2, -1, 8, -5, 11, -6]; foreach (var amount in amounts) < if (amount >= 0) < account.Credit(amount); >else < account.Debit(Math.Abs(amount)); >> > >
Спецификация языка C#
Дополнительные сведения см. в разделе об инструкции lock в документации Предварительная спецификация C# 6.0.
См. также
- справочник по C#
- System.Threading.Monitor
- System.Threading.SpinLock
- System.Threading.Interlocked
- Обзор примитивов синхронизации
- Общие сведения о System.Threading.Channels
Совместная работа с нами на GitHub
Источник этого содержимого можно найти на GitHub, где также можно создавать и просматривать проблемы и запросы на вытягивание. Дополнительные сведения см. в нашем руководстве для участников.
Класс RLock() модуля threading в Python
Класс RLock() модуля threading реализует объекты реентерабельной (повторной) блокировки.
Повторяющаяся блокировка должна быть снята потоком, который ее получил. Как только поток получил повторную блокировку, тот же поток может получить ее снова без блокировки. Поток должен освобождать ее столько раз, сколько раз он ее приобрел.
Обратите внимание, что threading.RLock на самом деле является фабричным классом, который возвращает экземпляр наиболее эффективной версии конкретного класса threading.RLock , поддерживаемого платформой.
Повторная блокировка потока — это примитив синхронизации, который может быть получен несколько раз одним и тем же потоком. Внутри он использует концепции «владеющего потоком» и «уровня рекурсии» в дополнение к locked / unlocked состоянию, используемому примитивными блокировками threading.Lock . В заблокированном locked состоянии какой-то поток владеет блокировкой, в разблокированном unlocked состоянии ни один поток не владеет им.
Чтобы включить блокировку, поток вызывает свой метод RLock.acquire() , он возвращает результат своему экземпляру, когда поток владеет блокировкой. Чтобы снять блокировку, поток вызывает свой метод RLock.release() .
Пары вызовов RLock.acquire() / RLock.release() могут быть вложенными, только последний RLock.release() ( release() самой внешней пары) сбрасывает режим locked на unlocked и позволяет продолжить работу другому потоку, заблокированному в RLock.acquire() .
Класс threading.RLock() также поддерживают протокол управления контекстом.
Методы объекта threading.RLock .
- RLock.acquire() устанавливает блокировку,
- RLock.release() снимает блокировку,
- Пример работы повторной блокировки threading.RLock() ,
RLock.acquire(blocking=True, timeout=-1) :
Метод RLock.acquire() устанавливает блокировку, блокирующую или неблокирующую..
При вызове без аргументов:
- Если этот поток уже владеет блокировкой, то увеличивает уровень рекурсии на единицу и немедленно возвращает результат своему экземпляру.
- Если другой поток владеет блокировкой, то текущий поток блокируется, пока не будет снята блокировка. Как только блокировка снята (не принадлежит ни одному потоку), то захватывает владение блокировкой, устанавливает уровень рекурсии на единицу и возвращает результат своему экземпляру.
- Если более одного потока находятся в ожидании снятия блокировки, то только один из них сможет получить право владения блокировкой. В этом случае нет возвращаемого значения.
При вызове метода с параметром blocking , установленным в значение True , выполнит то же действие, что и при вызове без аргументов и возвратит значение True .
При вызове метода с аргументом blocking , установленным в False , не ставит блокировку, а проверит, сможет ли метод с blocking=True поставить блокировку, если нет, то немедленно вернет False , в противном случае установит блокировку и возвратит True .
При вызове с аргументом тайм-аута timeout с числом float , установленным в положительное значение, будет блокировать выполнение кода не более чем на количество секунд, заданное таймаутом и до тех пор, пока блокировка не будет получена. В этом случае, возвращает True , если блокировка была получена и False , если истекло время ожидания timeout .
RLock.release() :
Метод RLock.release() снимает блокировку, уменьшив уровень рекурсии. Если после декремента он равен нулю, то сбрасывает блокировку на unlocked (т.е. блокировка не принадлежащую ни одному потоку), и если какие-либо другие потоки заблокированы, ожидая разблокировки, то разрешит выполнение ровно одному из них. Если после декремента уровень рекурсии все еще отличен от нуля, то блокировка остается locked и принадлежит вызывающему потоку.
Вызывайте этот метод только тогда, когда вызывающий поток владеет блокировкой. Если этот метод вызывается при уже снятой блокировке, то возникает исключение RuntimeError
Возвращаемого значения нет.
Пример работы повторной блокировки threading.RLock() .
Объекты threading.Lock() не могут быть получены более одного раза, даже одним и тем же потоком. Это может привести к нежелательным побочным эффектам, если доступ к блокировке осуществляется более чем одной функцией в одной цепочке вызовов.
В этом случае второму вызову Lock.acquire() необходимо дать нулевой тайм-аут, чтобы предотвратить его блокировку, потому что блокировка была уже получена первым вызовом.
>>> import threading >>> lock = threading.Lock() >>> lock # # первая блокировка >>> 'First try :', lock.acquire() # ('First try :', True) >>> lock # # вторая блокировка 'Second try:', lock.acquire(timeout=0) # ('Second try:', False)
В ситуации, когда отдельный код из одного и того же потока должен “повторно получить” блокировку, необходимо использовать объекты threading.RLock .
Единственным изменением в коде из предыдущего примера была замена объекта Rlock на Lock.
>>> import threading >>> rlock = threading.RLock() # первая блокировка >>> 'First try :', rlock.acquire() # ('First try :', True) # вторая блокировка >>>'Second try:', rlock.acquire(timeout=0) # ('Second try:', True) # смотрим состояние - счетчик повторных блокировок count=2 >>> rlock # # разблокируем 2 раза >>> rlock.release() >>> rlock.release() # смотрим состояние - счетчик повторных блокировок count=0 >>> rlock # # пробуем разблокировать уже разблокированное состояние >>> rlock.release() # Traceback (most recent call last): # File "", line 1, in # RuntimeError: cannot release un-acquired lock
- КРАТКИЙ ОБЗОР МАТЕРИАЛА.
- Получение общих сведений о потоках, модуль threading
- Класс Thread() модуля threading
- Класс local() модуля threading
- Класс Event() модуля threading
- Класс Lock() модуля threading
- Класс RLock() модуля threading
- Класс Condition() модуля threading
- Класс Semaphore() модуля threading
- Класс Timer() модуля threading
- Класс Barrier() модуля threading
- Протокол управления контекстом в модуле threading
- Трассировка и профилирование потоков модулем threading
- Как перезапускать потоки?
Для чего нужен threading lock
Нередко в потоках используются некоторые разделяемые ресурсы, общие для всей программы. Это могут быть общие переменные, файлы, другие ресурсы. Например:
int x = 0; // запускаем пять потоков for (int i = 1; i < 6; i++) < Thread myThread = new(Print); myThread.Name = $"Поток "; // устанавливаем имя для каждого потока myThread.Start(); > void Print() < x = 1; for (int i = 1; i < 6; i++) < Console.WriteLine($": "); x++; Thread.Sleep(100); > >
Здесь у нас запускаются пять потоков, которые вызывают метод Print и которые работают с общей переменной x. И мы предполагаем, что метод выведет все значения x от 1 до 5. И так для каждого потока. Однако в реальности в процессе работы будет происходить переключение между потоками, и значение переменной x становится непредсказуемым. Например, в моем случае я получил следующий консольный вывод (он может в каждом конкретном случае различаться):
Поток 1: 1 Поток 5: 1 Поток 4: 1 Поток 2: 1 Поток 3: 1 Поток 1: 6 Поток 5: 7 Поток 3: 7 Поток 2: 7 Поток 4: 9 Поток 1: 11 Поток 4: 11 Поток 2: 11 Поток 3: 14 Поток 5: 11 Поток 1: 16 Поток 2: 16 Поток 3: 16 Поток 5: 18 Поток 4: 16 Поток 1: 21 Поток 5: 21 Поток 3: 21 Поток 2: 21 Поток 4: 21
Решение проблемы состоит в том, чтобы синхронизировать потоки и ограничить доступ к разделяемым ресурсам на время их использования каким-нибудь потоком. Для этого используется ключевое слово lock . Оператор lock определяет блок кода, внутри которого весь код блокируется и становится недоступным для других потоков до завершения работы текущего потока. Остальный потоки помещаются в очередь ожидания и ждут, пока текущий поток не освободит данный блок кода. В итоге с помощью lock мы можем переделать предыдущий пример следующим образом:
int x = 0; object locker = new(); // объект-заглушка // запускаем пять потоков for (int i = 1; i < 6; i++) < Thread myThread = new(Print); myThread.Name = $"Поток "; myThread.Start(); > void Print() < lock (locker) < x = 1; for (int i = 1; i < 6; i++) < Console.WriteLine($": "); x++; Thread.Sleep(100); > > >
Для блокировки с ключевым словом lock используется объект-заглушка, в данном случае это переменная locker . Обычно это переменная типа object. И когда выполнение доходит до оператора lock, объект locker блокируется, и на время его блокировки монопольный доступ к блоку кода имеет только один поток. После окончания работы блока кода, объект locker освобождается и становится доступным для других потоков.
В этом случае консольный вывод будет более упорядоченным:
Поток 1: 1 Поток 1: 2 Поток 1: 3 Поток 1: 4 Поток 1: 5 Поток 5: 1 Поток 5: 2 Поток 5: 3 Поток 5: 4 Поток 5: 5 Поток 3: 1 Поток 3: 2 Поток 3: 3 Поток 3: 4 Поток 3: 5 Поток 2: 1 Поток 2: 2 Поток 2: 3 Поток 2: 4 Поток 2: 5 Поток 4: 1 Поток 4: 2 Поток 4: 3 Поток 4: 4 Поток 4: 5
Класс Lock() модуля threading в Python
Класс Lock() модуля threading , реализует примитивные объекты блокировки. Как только поток получил блокировку, то последующие попытки получить его блокируются, пока поток не будет разблокирован. Любой поток может снять блокировку.
Обратите внимание, что класс threading.Lock() на самом деле является фабричным, который возвращает экземпляр наиболее эффективной версии конкретного класса threading.Lock() , поддерживаемого платформой.
Примитивная блокировка потока — это примитив синхронизации, который при блокировке не принадлежит конкретному потоку. В Python в настоящее время это самый низкий из доступных примитивов синхронизации.
Примитивная блокировка находится в одном из двух состояний: «locked» или «unlocked«. Экземпляр класса создается в разблокированном «unlocked» состоянии. У него есть два основных метода: Lock.acquire() и Lock.release() .
Когда состояние «unlocked», то метод Lock.acquire() изменяет состояние на «locked» и немедленно возвращает результат экземпляру. Когда состояние «locked», то вызов метода Lock.acquire() блокируется до тех пор, пока вызов метода Lock.release() в другом потоке не изменит его на «unlocked», затем вызов Lock.acquire() сбрасывает его в «locked» и снова возвращает результат экземпляру.
Метод Lock.release() следует вызывать только когда экземпляр Lock находится в состоянии «locked». Этот метод меняет состояние экземпляра Lock на «unlocked» и немедленно возвращает результат экземпляру. Если будет сделана попытка вызвать Lock.release() для состояния «unlocked», то будет вызвана ошибка RuntimeError .
Класс threading.Lock() также поддерживают протокол управления контекстом.
Когда в Lock.acquire() блокируется более одного потока, ожидающего перехода состояния в «unlocked», то после вызова Lock.release() только один поток переходит в состояние «unlocked». Какой из ожидающих потоков начинает работать, не определено и может варьироваться в разных реализациях.
Методы объекта threading.Lock .
Все методы выполняются атомарно.
- Lock.acquire() устанавливает блокировку,
- Lock.release() снимает блокировку,
- Lock.locked() проверяет состояние блокировки,
- Общий пример использования блокировки,
- Применение блокировки при многопоточном объединении файлов.
Lock.acquire(blocking=True, timeout=-1) :
Метод Lock.acquire() устанавливает блокировку, блокирующую или неблокирующую.
При вызове метода с аргументом blocking , установленным в True (по умолчанию) — блокирует потоки до тех пор, пока блокировка не будет снята, затем снова установит ее в состояние «locked» и вернет True .
При вызове метода с аргументом blocking , установленным в False , не ставит блокировку, а проверит, сможет ли метод с blocking=True поставить блокировку, если нет, то немедленно вернет False , в противном случае установит блокировку и возвратит True .
При вызове с аргументом timeout (значение может быть float ), установленным в положительное значение, будет блокировать выполнение кода не более чем на количество секунд, заданное величиной timeout и до тех пор, пока блокировка не будет получена.
Аргумент тайм-аута timeout , равный -1, указывает на неограниченное ожидание. Запрещается указывать тайм-аут timeout при ложной блокировке.
Метод Lock.acquire() возвращает значение True , если блокировка получена успешно и False , если нет (например, если истекло время ожидания timeout ).
Lock.release() :
Метод Lock.release() снимает блокировку. Метод может быть вызван из любого потока, а не только из потока, который получил блокировку.
Когда блокировка включена, этот метод сбрасывает его до состояния «unlocked» и возвращает результат своему экземпляру. Если какие-либо другие потоки заблокированы, ожидая снятия блокировки, разрешает выполнение ровно одному из них.
При вызове метода при снятой блокировки возникает исключение RuntimeError .
Метод ничего не возвращает.
Lock.locked() :
Метод Lock.locked() возвращает True , если блокировка получена.
Общие примеры использования блокировок threading.Lock() .
Важно иметь возможность контролировать доступ к общим ресурсам для предотвращения повреждения или пропуска данных. Встроенные в Python структуры данных (списки, словари и т. д.) являются поточно-ориентированными, поскольку глобальная блокировка интерпретатора GIL, используемая для защиты внутренних структур данных Python, не снимается в середине обновления.
Другие структуры данных, реализованные в Python или более простые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту нескольких потоков используйте объект threading.Lock() .
В примере функция worker() увеличивает экземпляр счетчика, который управляет блокировкой, чтобы предотвратить одновременное изменение внутреннего состояния двух потоков. Если бы блокировка не использовалась, то была бы вероятность пропуска изменения атрибута value .
import threading, random, time class Counter(): def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): th_name = threading.current_thread().name print(f'Th: th_name> - ждет блокировку') self.lock.acquire() try: print(f'Th: th_name> - получил блокировку') self.value = self.value + 1 finally: self.lock.release() def worker(c): for i in range(2): pause = random.random() th_name = threading.current_thread().name print(f'Th: th_name> - заснул на pause:0.02f>') time.sleep(pause) c.increment() print(f'Th: th_name> - сделано.') counter = Counter() for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start() print('Ожидание рабочих потоков') main_thread = threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() print(f'Счетчик: counter.value>') # Th: Thread-1 - заснул на 0.34 # Th: Thread-2 - заснул на 0.26 # Ожидание рабочих потоков # Th: Thread-2 - ждет блокировку # Th: Thread-2 - получил блокировку # Th: Thread-2 - заснул на 0.34 # Th: Thread-1 - ждет блокировку # Th: Thread-1 - получил блокировку # Th: Thread-1 - заснул на 0.33 # Th: Thread-2 - ждет блокировку # Th: Thread-2 - получил блокировку # Th: Thread-2 - сделано. # Th: Thread-1 - ждет блокировку # Th: Thread-1 - получил блокировку # Th: Thread-1 - сделано. # Счетчик: 4
Чтобы узнать, получил ли другой поток блокировку, не задерживая текущий поток, необходимо передать False для аргумента blocking в методе Lock.acquire() . В следующем примере worker() пытается получить блокировку три раза и подсчитывает, сколько для этого нужно сделать попыток. Между тем, lock_holder() циклически переключает блокировку между удержанием и снятием блокировки с короткими паузами в каждом состоянии, для имитации нагрузки.
Функции worker() требуется более трех итераций, чтобы получить блокировку три отдельных раза.
import logging, threading, time def lock_holder(lock): logging.debug('Запуск') while True: lock.acquire() try: logging.debug('Нагрузка. ') time.sleep(0.5) finally: logging.debug('Работа закончена') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Запуск') num_tries = 0 num_acquires = 0 while num_acquires 3: time.sleep(0.5) logging.debug('Попытка блокировки ресурса') have_it = lock.acquire(blocking=False) try: num_tries += 1 if have_it: logging.debug(f'Попытка №num_tries>: ПОЛУЧИЛОСЬ') num_acquires += 1 else: logging.debug(f'Попытка №num_tries>: НЕ ПОЛУЧИЛОСЬ') finally: if have_it: lock.release() logging.debug('Ресурс успешно блокирован 3 раза, после попыток. ') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() holder = threading.Thread( target=lock_holder, args=(lock,), name='LockHolder', daemon=True, ) holder.start() worker = threading.Thread( target=worker, args=(lock,), name='Worker', ) worker.start() # (LockHolder) Запуск # (LockHolder) Нагрузка. # (Worker ) Запуск # (LockHolder) Работа закончена # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №1: ПОЛУЧИЛОСЬ # (LockHolder) Нагрузка. # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №2: НЕ ПОЛУЧИЛОСЬ # (LockHolder) Работа закончена # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №3: ПОЛУЧИЛОСЬ # (LockHolder) Нагрузка. # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №4: НЕ ПОЛУЧИЛОСЬ # (LockHolder) Работа закончена # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №5: ПОЛУЧИЛОСЬ # (Worker ) Ресурс успешно блокирован 3 раза, после 5 попыток.
Пример блокировки общего ресурса при многопоточном объединении файлов.
В этом примере выбираются все текстовые файлы из директории test_dir и объединяются в один multi-thead-file.txt . Программа читает и обрабатывает файлы из каталога test_dir и пишет в общий файл multi-thead-file.txt в несколько потоков.
Предупреждение. Выполнение этого кода не даст прирост производительности по сравнению с однопоточным режимом, т.к. запись в общий ресурс блокируется и по сути программа становиться однопоточной. Этот пример приведен чисто в учебных целях, что бы понять как организовать доступ к общему ресурсу из разных потоков не нарушая его целостности и увидеть применение класса threading.Lock() на практическом примере.
Здесь блокировка threading.Lock() используется, для того, что бы предотвратить одновременный доступ из других потоков к файлу с общими данными multi-thead-file.txt . Если убрать блокировку, то в итоговом файле можно увидеть, что строки из разных файлов перемешаны.
Места в коде, где используется блокировка, помечены . с обоих концов комментария.
Что бы запустить данный пример, необходимо подготовить данные скриптом prepare-data.py , приведенным в обзорной статье к модулю threading или использовать свои текстовые файлы (измените переменную test_dir ).
import pathlib, threading, time, queue class Worker(threading.Thread): def __init__(self, que, write_file, lock): super().__init__() self.daemon = True self.files_queue = files_queue self.write_file = write_file self.lock = lock # переопределяем метод def run(self): while True: # . блокируем доступ к файлу из других потоков, # . что бы строки не писались вперемешку из других # . открытых файлов. Для блокировки/разблокировки # . используем менеджер контекста with self.lock: # Получаем задание (имя файла) из очереди job = self.files_queue.get() print(f'Th:self.name> обработка job.name>') # открываем файл `job` на чтение, а `write_file` на # ДОБАВЛЕНИЕ 'a+' данных к файлу с общими данными with open(job, 'r') as fread, open(self.write_file, 'a+') as fwrite: # пишем имя открытого файла fwrite.write(f'\n\nДанные из файла: job.name>\n\n') # читаем данные построчно (экономим память) for line in fread: # здесь обрабатываем строку, # например, заменим букву у на 0 line = line.replace('у', '0') # потом пишем в файл fwrite.write(line) # Сообщаем очереди, что задача выполнена self.files_queue.task_done() path = pathlib.Path('.') # каталог с файлами test_dir = 'test_dir' path_dir = path.joinpath(test_dir) # список файлов list_files = path_dir.glob('*.txt') # создаем и заполняем очередь именами файлов files_queue = queue.Queue() for file in list_files: files_queue.put(file) # общий файл данных write_file = 'multi-thead-file.txt' if files_queue.empty(): print('НЕТ файлов для обработки.') else: # . создаем блокировщик . lock = threading.Lock() # Создаем и запускаем, например 3 потока for _ in range(3): th = Worker(files_queue, write_file, lock) th.start() # Блокируем выполнение программы до тех пор пока # потоки не обслужат все элементы очереди files_queue.join()
- КРАТКИЙ ОБЗОР МАТЕРИАЛА.
- Получение общих сведений о потоках, модуль threading
- Класс Thread() модуля threading
- Класс local() модуля threading
- Класс Event() модуля threading
- Класс Lock() модуля threading
- Класс RLock() модуля threading
- Класс Condition() модуля threading
- Класс Semaphore() модуля threading
- Класс Timer() модуля threading
- Класс Barrier() модуля threading
- Протокол управления контекстом в модуле threading
- Трассировка и профилирование потоков модулем threading
- Как перезапускать потоки?