Celery: Singleton Tasks SingletonTask 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 from celery.app.task import Task from celery.utils.log import get_task_logger from django.core.cache import cache logger = get_task_logger(__name__) class SingletonTask(Task): def __call__(self, *args, **kwargs): self.lock_key = self.resolve_lock_key(*args, **kwargs) if not self.request.is_eager and self.lock_key: # ロックキーが指定されて、 非同期モードであればシングルトンで動作させる return self.call_singleton(*args, **kwargs) # それ以外はデフォルトの動作 return super().__call__(*args, **kwargs) def resolve_lock_key(self, *args, **kwargs): """ taskに `singleton` キーワード変数でキーが指定されていたらシングルトンモード """ singleton = kwargs.get("singleton", None) return singleton and f"{self.name}-{singleton}" def prepare_execute(self, *args, **kwargs): """ シングルトンタスクの実行の前に何かする""" pass def call_singleton(self, *args, **kwargs): lock = cache.lock(self.lock_key) if not lock.acquire(blocking=False): # 取得できなかったら何もしない(同じような冪等処理がすでに動いている) logger.info("{} failed to lock:".format(self.lock_key)) return "SKIPPED" self.prepare_execute(*args, **kwargs) try: # 実際のタスクを実行 return super(SingletonTask, self).__call__(*args, **kwargs) except Exception as e: logger.error(f"task faiild:{e}") lock.release() raise e finally: lock.release() サンプル 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class CostSingletonTask(SingletonTask): def prepare_execute(self, *args, **kwargs): """実行前に設定時間(秒)でsleepする この間の重複処理が行われない """ logger.error(f"LOCK ACQUIRE:{self.lock_key}") logger.error(f"CALLING:{args} {kwargs}") logger.info(f"Sleep for ASYNC_WINDOW({settings.ASYNC_WINDOW})") sleep(settings.ASYNC_WINDOW) @shared_task(base=CostSingletonTask) def info(message, singleton=None): logger.error(f"Executing:{message}") return message リンク Celery: Task Singleton? Ensuring a task is only executed one at a time