Python中如何实现基于队列的分布式锁

2023-04-11 00:00:00 分布式 队列 如何实现

基于队列的分布式锁,通常会用到Redis作为队列的数据存储和消息传递的中间件。具体实现方式如下:

  1. 创建Redis连接
import redis

redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
  1. 定义加锁函数
def acquire_lock(lock_name, acquire_timeout=10):
    """尝试获取锁"""
    identifier = str(uuid.uuid4())  # 生成一个唯一的标识符
    end_time = time.time() + acquire_timeout
    while time.time() < end_time:
        if redis_conn.set(lock_name, identifier, ex=acquire_timeout, nx=True):
            return identifier
        time.sleep(0.1)
    return False
  1. 定义释放锁函数
def release_lock(lock_name, identifier):
    """尝试释放锁"""
    while True:
        redis_conn.watch(lock_name)  # 开启事务,监听锁
        if redis_conn.get(lock_name) == identifier:
            # 如果锁是当前线程持有的,则删除锁
            with redis_conn.pipeline() as pipe:
                pipe.multi()
                pipe.delete(lock_name)
                pipe.execute()
            return True
        redis_conn.unwatch()
        break
    return False

使用样例:

import time
import uuid
import threading


def worker():
    lock = acquire_lock('mylock')
    if lock:
        print('process {} acquired lock'.format(threading.current_thread().name))
        time.sleep(5)
        release_lock('mylock', lock)
        print('process {} released lock'.format(threading.current_thread().name))
    else:
        print('process {} failed to acquire lock'.format(threading.current_thread().name))


if __name__ == '__main__':
    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker)
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

执行结果:

process Thread-1 acquired lock
process Thread-7 failed to acquire lock
process Thread-2 failed to acquire lock
process Thread-9 failed to acquire lock
process Thread-3 failed to acquire lock
process Thread-4 failed to acquire lock
process Thread-8 failed to acquire lock
process Thread-6 failed to acquire lock
process Thread-5 failed to acquire lock
process Thread-10 failed to acquire lock
process Thread-1 released lock

相关文章