1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| from celery.app.task import Task
from celery.utils.log import get_task_logger
from django.core.cache import cache
logger = get_task_logger(__name__)
class SingletonTask(Task):
def __call__(self, *args, **kwargs):
self.lock_key = self.resolve_lock_key(*args, **kwargs)
if not self.request.is_eager and self.lock_key:
# ロックキーが指定されて、 非同期モードであればシングルトンで動作させる
return self.call_singleton(*args, **kwargs)
# それ以外はデフォルトの動作
return super().__call__(*args, **kwargs)
def resolve_lock_key(self, *args, **kwargs):
""" taskに `singleton` キーワード変数でキーが指定されていたらシングルトンモード """
singleton = kwargs.get("singleton", None)
return singleton and f"{self.name}-{singleton}"
def prepare_execute(self, *args, **kwargs):
""" シングルトンタスクの実行の前に何かする"""
pass
def call_singleton(self, *args, **kwargs):
lock = cache.lock(self.lock_key)
if not lock.acquire(blocking=False):
# 取得できなかったら何もしない(同じような冪等処理がすでに動いている)
logger.info("{} failed to lock:".format(self.lock_key))
return "SKIPPED"
self.prepare_execute(*args, **kwargs)
try:
# 実際のタスクを実行
return super(SingletonTask, self).__call__(*args, **kwargs)
except Exception as e:
logger.error(f"task faiild:{e}")
lock.release()
raise e
finally:
lock.release()
|