Python中如何实现基于队列的分布式锁
基于队列的分布式锁,通常会用到Redis作为队列的数据存储和消息传递的中间件。具体实现方式如下:
- 创建Redis连接
import redis redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
- 定义加锁函数
def acquire_lock(lock_name, acquire_timeout=10): """尝试获取锁""" identifier = str(uuid.uuid4()) # 生成一个唯一的标识符 end_time = time.time() + acquire_timeout while time.time() < end_time: if redis_conn.set(lock_name, identifier, ex=acquire_timeout, nx=True): return identifier time.sleep(0.1) return False
- 定义释放锁函数
def release_lock(lock_name, identifier): """尝试释放锁""" while True: redis_conn.watch(lock_name) # 开启事务,监听锁 if redis_conn.get(lock_name) == identifier: # 如果锁是当前线程持有的,则删除锁 with redis_conn.pipeline() as pipe: pipe.multi() pipe.delete(lock_name) pipe.execute() return True redis_conn.unwatch() break return False
使用样例:
import time import uuid import threading def worker(): lock = acquire_lock('mylock') if lock: print('process {} acquired lock'.format(threading.current_thread().name)) time.sleep(5) release_lock('mylock', lock) print('process {} released lock'.format(threading.current_thread().name)) else: print('process {} failed to acquire lock'.format(threading.current_thread().name)) if __name__ == '__main__': threads = [] for i in range(10): thread = threading.Thread(target=worker) threads.append(thread) thread.start() for thread in threads: thread.join()
执行结果:
process Thread-1 acquired lock process Thread-7 failed to acquire lock process Thread-2 failed to acquire lock process Thread-9 failed to acquire lock process Thread-3 failed to acquire lock process Thread-4 failed to acquire lock process Thread-8 failed to acquire lock process Thread-6 failed to acquire lock process Thread-5 failed to acquire lock process Thread-10 failed to acquire lock process Thread-1 released lock
相关文章