Celery: Singleton Tasks

SingletonTask

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from celery.app.task import Task
from celery.utils.log import get_task_logger
from django.core.cache import cache

logger = get_task_logger(__name__)


class SingletonTask(Task):
    def __call__(self, *args, **kwargs):
        self.lock_key = self.resolve_lock_key(*args, **kwargs)
        if not self.request.is_eager and self.lock_key:
            # ロックキーが指定されて、 非同期モードであればシングルトンで動作させる
            return self.call_singleton(*args, **kwargs)
        # それ以外はデフォルトの動作
        return super().__call__(*args, **kwargs)

    def resolve_lock_key(self, *args, **kwargs):
        """ taskに `singleton` キーワード変数でキーが指定されていたらシングルトンモード """
        singleton = kwargs.get("singleton", None)
        return singleton and f"{self.name}-{singleton}"

    def prepare_execute(self, *args, **kwargs):
        """ シングルトンタスクの実行の前に何かする"""
        pass

    def call_singleton(self, *args, **kwargs):
        lock = cache.lock(self.lock_key)
        if not lock.acquire(blocking=False):
            # 取得できなかったら何もしない(同じような冪等処理がすでに動いている)
            logger.info("{} failed to lock:".format(self.lock_key))
            return "SKIPPED"

        self.prepare_execute(*args, **kwargs)

        try:
            # 実際のタスクを実行
            return super(SingletonTask, self).__call__(*args, **kwargs)
        except Exception as e:
            logger.error(f"task faiild:{e}")
            lock.release()
            raise e
        finally:
            lock.release()

サンプル

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class CostSingletonTask(SingletonTask):
    def prepare_execute(self, *args, **kwargs):
        """実行前に設定時間(秒)でsleepする
        この間の重複処理が行われない
        """
        logger.error(f"LOCK ACQUIRE:{self.lock_key}")
        logger.error(f"CALLING:{args} {kwargs}")

        logger.info(f"Sleep for ASYNC_WINDOW({settings.ASYNC_WINDOW})")
        sleep(settings.ASYNC_WINDOW)


@shared_task(base=CostSingletonTask)
def info(message, singleton=None):
    logger.error(f"Executing:{message}")
    return message

リンク