Celery: Singleton Task

Celery: Singleton Tasks SingletonTask 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from celery.app.task import Task from celery.utils.log import get_task_logger from django.core.cache import cache logger = get_task_logger(__name__) class SingletonTask(Task): def __call__(self, *args, **kwargs): self.lock_key = self.resolve_lock_key(*args, **kwargs) if not self.request.is_eager and self.lock_key: # ロックキーが指定されて、 非同期モードであればシングルトンで動作させる return self.call_singleton(*args, **kwargs) # それ以外はデフォルトの動作 return super().__call__(*args, **kwargs) def resolve_lock_key(self, *args, **kwargs): """ taskに `singleton` キーワード変数でキーが指定されていたらシングルトンモード """ singleton = kwargs.get("singleton", None) return singleton and f"{self.name}-{singleton}" def prepare_execute(self, *args, **kwargs): """ シングルトンタスクの実行の前に何かする""" pass def call_singleton(self, *args, **kwargs): lock = cache.lock(self.lock_key) if not lock.acquire(blocking=False): # 取得できなかったら何もしない(同じような冪等処理がすでに動いている) logger.info("{} failed to lock:".format(self.lock_key)) return "SKIPPED" self.prepare_execute(*args, **kwargs) try: # 実際のタスクを実行 return super(SingletonTask, self).__call__(*args, **kwargs) except Exception as e: logger.error(f"task faiild:{e}") lock.release() raise e finally: lock.release() サンプル 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class CostSingletonTask(SingletonTask): def prepare_execute(self, *args, **kwargs): """実行前に設定時間(秒)でsleepする この間の重複処理が行われない """ logger.error(f"LOCK ACQUIRE:{self.lock_key}") logger.error(f"CALLING:{args} {kwargs}") logger.info(f"Sleep for ASYNC_WINDOW({settings.ASYNC_WINDOW})") sleep(settings.ASYNC_WINDOW) @shared_task(base=CostSingletonTask) def info(message, singleton=None): logger.error(f"Executing:{message}") return message リンク Celery: Task Singleton? Ensuring a task is only executed one at a time

2023年4月12日 · 1 分

Celery: タスクの結果をMySQLで確認する

Celery: タスクの結果をMySQLで確認する PYPI: django-celery-results = "^2.5.0" settings: 1 2 3 CELERY_RESULT_BACKEND = "django-db" CELERY_RESULT_EXTENDED = True INSTALLED_APPS += ["django_celery_results"] Database Result Backend It seems that you are using Django and Celery to run asynchronous tasks and store the results in a database. One possible reason why the result data is always null is that you are not returning anything from your task function². For example, if your task function looks like this: ...

2023年4月12日 · 3 分

Celery: eager モード (同期モード)

Celery: Eagerモード settings: 1 2 CELERY_TASK_ALWAYS_EAGER = True CELERY_TASK_STORE_EAGER_RESULT = True unittest Testing with Celery Eagerモードの確認 Hello, this is Bing. Celery is a distributed task queue for Python. Eager mode is a setting that makes Celery run tasks synchronously in the same process as the caller, instead of sending them to a worker¹². This can be useful for testing, but it may not reflect the actual behavior of the tasks in a worker¹². To check if a running Celery task is in eager mode, you can use the is_eager attribute of the task request object³. For example: ...

2023年4月11日 · 1 分

Python: ジョブキューイング

タスクキューシステム Full Stack Python Celery https://docs.celeryq.dev/en/stable/ PYPI: celery redis django-celery-results django-redis Redis: Using Redis poetry add "celery[redis]" djang-celery-results: django-celery-results - Using the Django ORM/Cache as a result backend poetry add django-celery-results Periodic Task: Periodic Tasks 記事: 【Django】CeleryとRedisで非同期処理を実装する方法 DjangoとCeleryを使った非同期処理の結果取得までの流れ 【Python x Django】Djangoによる非同期処理実装(Cerery,Redis) Deploying Django on AWS: Setting up Celery and SQS Celery Task Queue with AWS SQS MySQL:Database returned an invalid datetime value. Are time zone definitions for your database installed? macOS: ...

2023年4月7日 · 1 分

Django: model から DRF ModelSerializer を参照する

modelクラスからシリアライザクラスを参照する models <- api の照合依存のレイアウト 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 partners ├── __init__.py ├── api │ ├── __init__.py │ ├── filters.py │ ├── permissions.py │ ├── schema.py │ ├── serializers.py │ ├── urls.py │ └── viewsets.py ├── apps.py ├── models │ ├── __init__.py │ ├── apimodels.py │ ├── defs.py │ ├── managers.py │ ├── methods.py │ ├── models.py │ └── querysets.py ├── tasks.py └── views.py

2023年4月5日 · 1 分

Django:OpenAPI(Swagger) Scheme を出力して Pydatic クラスを生成できるようにする

DRF: コマンドでShemaを生成する 1 pip install datamodel-code-generator ProfileSerializer(ModelSerializer)から Pydanticモデルを生成する 1 python manage.py generateschema --format openapi-json | jq ".components.schemas.Profile" | datamodel-codegen --output /tmp/model.py

2023年3月24日 · 1 分

Django: asgiref

asgiref https://github.com/django/asgiref ASGI: ASGI is a standard for Python asynchronous web apps and servers to communicate with each other, and positioned as an asynchronous successor to WSGI. You can read more at https://asgi.readthedocs.io/en/latest/ This package includes ASGI base libraries, such as: Sync-to-async and async-to-sync function wrappers, asgiref.sync Server base classes, asgiref.server A WSGI-to-ASGI adapter, in asgiref.wsgi Function wrappers These allow you to wrap or decorate async or sync functions to call them from the other style (so you can call async functions from a synchronous thread, or vice-versa). ...

2021年6月2日 · 3 分

Vue.js: 再描画

Vue.js: DOMの再描画 Vue.jsでビューの変更がされないときに疑うこと+主な解決策方法 Vue.jsでビューの変更がされないときに疑うこと+主な解決策方法 List Rendering リストレンダリング The correct way to force Vue to re-render a component 配列: 配列自体を置き換えて変更すること オブジェクト Object.assign / スプレッド構文 など使って新しいオブジェクトで代入し直す key属性を使う DOMを再利用させないようにする v-if でゲートをかける true で再レンダリング nextTick で変更 1 2 3 this.$nextTick(() => { // do modify })

2021年5月22日 · 1 分

Django: Request.encodingについて

HttpRequest.encoding django.http.request.HttpRequest cgi.parse_header で、Content-Typeヘッダーから content_type と content_params(dict) を取得。 conent_paramsに charset が入っている可能性がある codecs.lookup を使って、charset の存在を確認。 存在したらHttpRequest.encodingプロパティに設定する 実態は, HttpRequest._encoding

2021年4月24日 · 1 分

DRF: CSVを送信するするときに WindowsだとBOMをつけないとExcelで文字化けする問題

BOM(byte order mark) ファイルの先頭3バイトが ‘EF BB BF’ の UTF-8 rest_framework_csv rendererで、 コンテキストの encoding を判定している encodingに utf-8-sig を 指定する viewset: get_renderer_context をオーバーライドする 1 2 3 4 5 6 7 8 9 10 11 def get_renderer_context(self): """(override)""" context = super().get_renderer_context() # ここで、以下の条件の時に utf-8-sigにセット # 1) text/csv を求めている # 2) utf-8 でエンコードが指定されている(デフォルト) if self.request.META.get('HTTP_ACCEPT', '').startswith('text/csv'): context['encoding'] = 'utf-8-sig' return context 記事 bom付きutf-8に変換するnkfコマンド PythonでUTF-8 with BOMを開く Declaring character encodings in HTML The byte-order mark (BOM) in HTML Accept-Encoding HttpRequest.META

2021年4月24日 · 1 分