Celery: Singleton Task

Celery: Singleton Tasks SingletonTask 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from celery.app.task import Task from celery.utils.log import get_task_logger from django.core.cache import cache logger = get_task_logger(__name__) class SingletonTask(Task): def __call__(self, *args, **kwargs): self.lock_key = self.resolve_lock_key(*args, **kwargs) if not self.request.is_eager and self.lock_key: # ロックキーが指定されて、 非同期モードであればシングルトンで動作させる return self.call_singleton(*args, **kwargs) # それ以外はデフォルトの動作 return super().__call__(*args, **kwargs) def resolve_lock_key(self, *args, **kwargs): """ taskに `singleton` キーワード変数でキーが指定されていたらシングルトンモード """ singleton = kwargs.get("singleton", None) return singleton and f"{self.name}-{singleton}" def prepare_execute(self, *args, **kwargs): """ シングルトンタスクの実行の前に何かする""" pass def call_singleton(self, *args, **kwargs): lock = cache.lock(self.lock_key) if not lock.acquire(blocking=False): # 取得できなかったら何もしない(同じような冪等処理がすでに動いている) logger.info("{} failed to lock:".format(self.lock_key)) return "SKIPPED" self.prepare_execute(*args, **kwargs) try: # 実際のタスクを実行 return super(SingletonTask, self).__call__(*args, **kwargs) except Exception as e: logger.error(f"task faiild:{e}") lock.release() raise e finally: lock.release() サンプル 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class CostSingletonTask(SingletonTask): def prepare_execute(self, *args, **kwargs): """実行前に設定時間(秒)でsleepする この間の重複処理が行われない """ logger.error(f"LOCK ACQUIRE:{self.lock_key}") logger.error(f"CALLING:{args} {kwargs}") logger.info(f"Sleep for ASYNC_WINDOW({settings.ASYNC_WINDOW})") sleep(settings.ASYNC_WINDOW) @shared_task(base=CostSingletonTask) def info(message, singleton=None): logger.error(f"Executing:{message}") return message リンク Celery: Task Singleton? Ensuring a task is only executed one at a time

2023年4月12日 · 1 分

Celery: supervisord

Celery: supervisord conf 1 2 3 4 5 6 7 8 9 10 11 12 [program:epm-tasks] directory=/home/ubuntu/projects/epm/web user=ubuntu command=/home/ubuntu/.anyenv/envs/pyenv/versions/coresys/bin/celery -A app worker -l INFO -f /home/ubuntu/projects/epm/logs/tasks.log autostart=true autorestart=true ;stdout_logfile=syslog ;stderr_logfile=syslog numprocs=1 startsecs=10 stopwaitsecs = 600 killasgroup=true -f オプションで指定すると user の所有権でファイル作成 stdout_logfile / stderr_logfile で指定するとsupervisord ユーザー(root)の所有権で作成 log rotation It seems that you are using supervisord to manage Celery processes and you want to rotate the log files for Celery on a daily basis. One way to do this is to use logrotate, a tool that can rotate and compress log files according to a configuration file². To use logrotate, you need to do the following steps: ...

2023年4月12日 · 2 分

Celery: タスクの結果をMySQLで確認する

Celery: タスクの結果をMySQLで確認する PYPI: django-celery-results = "^2.5.0" settings: 1 2 3 CELERY_RESULT_BACKEND = "django-db" CELERY_RESULT_EXTENDED = True INSTALLED_APPS += ["django_celery_results"] Database Result Backend It seems that you are using Django and Celery to run asynchronous tasks and store the results in a database. One possible reason why the result data is always null is that you are not returning anything from your task function². For example, if your task function looks like this: ...

2023年4月12日 · 3 分

Celery: eager モード (同期モード)

Celery: Eagerモード settings: 1 2 CELERY_TASK_ALWAYS_EAGER = True CELERY_TASK_STORE_EAGER_RESULT = True unittest Testing with Celery Eagerモードの確認 Hello, this is Bing. Celery is a distributed task queue for Python. Eager mode is a setting that makes Celery run tasks synchronously in the same process as the caller, instead of sending them to a worker¹². This can be useful for testing, but it may not reflect the actual behavior of the tasks in a worker¹². To check if a running Celery task is in eager mode, you can use the is_eager attribute of the task request object³. For example: ...

2023年4月11日 · 1 分

tmux: バッファーの全テキストを選択

tmux: バッファーの全テキストを選択 viキーバインド前提: コピーモードに入る(:copy-mode) g でバッファーの先頭へ移動 Spaceでバッファのコピー開始 G でバッファーの最後へ移動 Enterでバッファーのコピー あとは、:paste-buffer で vi とかに貼り付けする。 Edge To copy all text in the current tmux screen, you need to enter copy-mode, select the entire scrollback, and yank it into the tmux buffer. Then you can paste it in another tmux pane or window. The exact keys you need to press may vary depending on your mode-keys option (vi or emacs) and your system (Mac or Linux). For example, if you use vi mode-keys on Linux, you can press Ctrl + b followed by [ to enter copy-mode, then press g to go to the top of the scrollback, then press Ctrl + Space to start copying, then press G to go to the bottom of the scrollback, then press Alt + w or Ctrl + w to yank the text into the tmux buffer. To paste the text, you can press Ctrl + b followed by ]. Does this answer your question? ...

2023年4月11日 · 1 分

Python: ジョブキューイング

タスクキューシステム Full Stack Python Celery https://docs.celeryq.dev/en/stable/ PYPI: celery redis django-celery-results django-redis Redis: Using Redis poetry add "celery[redis]" djang-celery-results: django-celery-results - Using the Django ORM/Cache as a result backend poetry add django-celery-results Periodic Task: Periodic Tasks 記事: 【Django】CeleryとRedisで非同期処理を実装する方法 DjangoとCeleryを使った非同期処理の結果取得までの流れ 【Python x Django】Djangoによる非同期処理実装(Cerery,Redis) Deploying Django on AWS: Setting up Celery and SQS Celery Task Queue with AWS SQS MySQL:Database returned an invalid datetime value. Are time zone definitions for your database installed? macOS: ...

2023年4月7日 · 1 分

MySQL: ERROR 3780 (HY000) at line 1406622: Referencing column 'os_branch_id' and referenced column 'id' in foreign key constraint '***' are incompatible.

ERROR 3780 (HY000) at line 1406622: Referencing column 'os_branch_id' and referenced column 'id' in foreign key constraint '***' are incompatible. 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE `gas_bombehouse` ( `id` int NOT NULL AUTO_INCREMENT, `code` varchar(50) NOT NULL, .... `os_branch_id` bigint DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `code` (`code`), KEY `gas_bombehouse_os_branch_id_d35d17d3_fk_outsource` (`os_branch_id`), CONSTRAINT `gas_bombehouse_os_branch_id_d35d17d3_fk` FOREIGN KEY (`os_branch_id`) REFERENCES `outsources_outsourcebranch` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3; /*!40101 SET character_set_client = @saved_cs_client */; 1 2 3 4 5 6 7 CREATE TABLE `outsources_outsourcebranch` ( `id` bigint NOT NULL AUTO_INCREMENT, ... PRIMARY KEY (`id`), ) ENGINE=InnoDB AUTO_INCREMENT=9751 DEFAULT CHARSET=utf8mb3; /*!40101 SET character_set_client = @saved_cs_client */; 原因 リモートの mysqldump を ローカルにそのまま取り込もうとしていた 古いスキーマが残っていて、そのidがint であったため。 DROP TABLE IF EXISTS table CREATE TABLE table が実行されるまで、古いスキーマが残っている、ということ。 1 2 3 4 5 DROP TABLE IF EXISTS `outsources_outsourcebranch`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!50503 SET character_set_client = utf8mb4 */; CREATE TABLE `outsources_outsourcebranch` .... 解決 DROP DATABASE & CREATE DATABASE して、更地にmysqldumpをロードする 資料 How to fix MySQL error 1215 Cannot add foreign key constraint ...

2023年4月6日 · 2 分

PYPI

cryptgraphy/cffi M1 Mac に cryptgraphy ( cffi ) をインストールする

2023年4月6日 · 1 分

Django: model から DRF ModelSerializer を参照する

modelクラスからシリアライザクラスを参照する models <- api の照合依存のレイアウト 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 partners ├── __init__.py ├── api │ ├── __init__.py │ ├── filters.py │ ├── permissions.py │ ├── schema.py │ ├── serializers.py │ ├── urls.py │ └── viewsets.py ├── apps.py ├── models │ ├── __init__.py │ ├── apimodels.py │ ├── defs.py │ ├── managers.py │ ├── methods.py │ ├── models.py │ └── querysets.py ├── tasks.py └── views.py

2023年4月5日 · 1 分

Python: ThreadPoolExecutor: マルチスレッド

Python: ThreadPoolExecutor: マルチスレッド 1 2 3 4 5 6 7 8 9 def bind_all(self): def _bind(instance): instance.bind_to_debt() executor = ThreadPoolExecutor(max_workers=4) for i in self.all(): executor.submit(_bind, i) executor.shutdown() # 全てのスレッドが終わるのを待つ

2023年3月30日 · 1 分