Celery

概要 Redis や RabbitMQ をブローカーに、非同期タスク実行・定期タスク(beat)を実現。ECS Fargate 上では worker/beat を独立タスクとして実行。ログは CloudWatch Logs に出力。 関連ページ Redis — Celery のブローカー/バックエンド ソース記事 Celery — 2023-04 Celery on ECS — 2023-07

2026年4月6日 · 1 分

Redis

概要 インメモリストレージで、Memcached より豊富なデータ構造(List・Set・Sorted Set・Stream)対応。Django キャッシング・Celery ブローカー・セッションストアとして広く活用。ElastiCache クラスターモードでシャーディング・高可用性確保。 分散ロック Lua スクリプトによる複数コマンドのアトミック実行で競合状態を回避。フェンシングトークンで堅牢なロック実装が可能。 関連ページ 分散ロック — Redis を使った排他制御 Celery — Redis をブローカーとして利用 ソース記事 Redis — 2023-05 Redis フェンシングロック — 2026-03

2026年4月6日 · 1 分

分散ロック

概要 Redis-py の Lock クラスは UUID ベースのトークン管理を提供。フェンシングトークン(単調増加する数値)を実装することで、GC pause による False Positive を防止する堅牢な分散ロックが実現可能。Lua スクリプトでアトミック性を保証。 関連ページ Redis — 分散ロックの基盤 ソース記事 Redis フェンシングロック — 2026-03 Django Cache Lock — 2024-01

2026年4月6日 · 1 分

redis-py の Lock をサブクラス化してフェンシングトークンを実装する

redis-py の Lock クラスは UUID ベースのトークンでロックの所有権を管理するが、フェンシングトークン(単調増加する数値)は提供しない。しかし、Lock クラスは do_acquire や Lua スクリプトをオーバーライドできる設計になっており、サブクラス化でフェンシングトークンを追加できる。 本記事では、redis-py の Lock を拡張してフェンシングトークンを発行する FencedLock クラスの実装例を紹介する。 前提知識:Redis の Lua スクリプティング Redis はバージョン 2.6 から Lua スクリプトの実行機能を内蔵している。EVAL コマンドで Lua スクリプトを Redis サーバー上で直接実行でき、複数の Redis コマンドをアトミック(不可分)に実行できる。 なぜ Lua スクリプトが必要か 通常、Redis コマンドは1つずつ実行される。例えば「キーが存在しなければセットし、同時にカウンターをインクリメントする」という処理を2つのコマンドで行うと、その間に他のクライアントが割り込む可能性がある: クライアント A: SET mykey value NX → 成功 ← クライアント B が割り込む余地 クライアント A: INCR counter → インクリメント Lua スクリプトを使えば、この2つの操作を1回のアトミックな呼び出しにまとめられる: 1 2 3 4 5 6 -- Redis サーバー上で実行される(他のコマンドは割り込めない) local ok = redis.call('SET', KEYS[1], ARGV[1], 'NX') if ok then return redis.call('INCR', KEYS[2]) end return nil Redis CLI での実行例 1 2 # EVAL "スクリプト" キーの数 キー1 キー2 ... 引数1 引数2 ... redis-cli EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey myvalue redis-py での実行例 1 2 3 4 5 6 7 8 9 10 import redis r = redis.Redis() # 方法1: eval で直接実行 r.eval("return redis.call('SET', KEYS[1], ARGV[1])", 1, "mykey", "myvalue") # 方法2: register_script で事前登録(推奨) # サーバー側に SHA1 でキャッシュされ、2回目以降はスクリプト本文の転送が不要 script = r.register_script("return redis.call('GET', KEYS[1])") result = script(keys=["mykey"]) セキュリティ上の注意 Lua スクリプトのパラメータは KEYS[] と ARGV[] で渡される。SQL のプリペアドステートメントと同様に、パラメータが文字列としてスクリプトに展開されることはないため、パラメータ経由でのインジェクションはできない。ただし、ユーザー入力でスクリプト文字列自体を動的に組み立てると危険なので、スクリプトは固定文字列として定義すること。 ...

2026年3月17日 · 4 分

Redisを「共有状態」として使うアンチパターン:キー設計の落とし穴

Redis はキャッシュとして非常に優秀なツールだが、複数のチームやサービスが**共有状態(shared state)**として Redis を使い始めると、設計上の問題が発生しやすくなる。 キャッシュと共有状態の違い Redis をキャッシュとして使う場合、データは一時的なものであり、いつ消えても問題ない。元データは RDB などに存在し、キャッシュミス時に再構築できる。 一方、共有状態として使う場合は話が変わる。複数のサービスが同じ Redis キーを読み書きし、そのデータが「正」として扱われる。RDB のようなスキーマや制約がないため、以下の問題が起きやすい。 暗黙の契約に依存したデータ構造 RDB であればスキーマによってデータ構造が明示的に定義される。カラム名、型、制約、外部キーなどが設計書の役割を果たす。 Redis にはそのような仕組みがない。キーの命名規則やデータ形式は開発者間の「暗黙の契約」に依存する。チームが増えると、以下のような問題が顕在化する: キーの命名が衝突する — 異なるチームが同じプレフィックスを使ってしまう データ形式の不一致 — あるサービスは JSON、別のサービスは MessagePack で書き込む バージョン管理の欠如 — データ構造を変更しても、読み取り側が追従できない 「削除できないキー」問題 最も厄介な問題の一つが、誰が所有しているのか分からないキーが残り続けることだ。 本番環境で以下のような状況が発生する: # このキーは誰が作った?いつ expire する?削除していい? GET user:session:abc123:metadata 作成したサービスがすでに廃止されている TTL が設定されていないため、永遠に残る 他のサービスが依存している可能性があり、安易に削除できない キーを「パブリック API」として扱う この問題に対する実践的なアプローチとして、Redis キーをパブリック API のように扱うという考え方がある: バージョニング — キー名にバージョンを含める(例: v2:user:session:{id}) ドキュメント化 — どのキーがどのサービスによって管理されているかを明文化する オーナーの明確化 — 各キーに責任を持つチーム・サービスを割り当てる TTL の必須化 — 共有キーには必ず TTL を設定し、期限切れを明示する 補足:分散ロック基盤としての Redis Redis を共有状態として使うもう一つの典型例が、トランザクション境界をまたぐ分散ロックだ。SET key value NX PX timeout を使ったロックや、Redlock アルゴリズムは広く利用されているが、ここにも落とし穴がある。 ...

2026年3月9日 · 3 分

Redis Pub/Sub から Streams への移行で帯域 99% 削減 --- 同時接続 30 万超チャットの実践記録

Redis Pub/Sub から Streams への移行で帯域 99% 削減 — 同時接続 30 万超チャットの実践記録 Keisuke Nishitani 氏(@Keisuke69)のポストで、LY Corp(旧 LINE)の技術ブログ記事が紹介されていました。同時接続数 30 万超の LINE 公式アカウント(OA)チャットが、メッセージ配信基盤を Redis Cluster の Pub/Sub から Redis Streams へ移行した事例です。ピーク時 1.5 Gbps だったノードあたりの帯域が 11 Mbps まで削減されたという結果は、大規模リアルタイムシステムを運用するエンジニアにとって示唆に富む内容です。 同時接続数30万超のチャットサービスのメッセージ配信基盤をRedis Pub/SubからRedis Streamsにした話 — @Keisuke69 背景 — Redis Cluster Pub/Sub のスケール限界 LINE 公式アカウントのチャット機能(OA チャット)は、ユーザーから送られたメッセージを OA オーナーにリアルタイムで配信する仕組みを持っています。この配信基盤として Redis Cluster の Pub/Sub を使用していました。 問題は Redis Cluster における Pub/Sub の仕様にあります。あるシャードに publish されたメッセージは、クラスター内の全シャードにブロードキャストされます。24 シャード構成であれば、1 メッセージが残り 23 シャードに伝搬するため、アウトバウンドトラフィックはインバウンドの 23 倍になります。 指標 値 クラスター構成 24 シャード / 48 ノード ノードあたり帯域(平常時) 500 Mbps ノードあたり帯域(ピーク時) 1.5 Gbps シャードを増やせばクラスター性能は上がりますが、同時にブロードキャストのトラフィックも増えるというジレンマがあり、スケールアウトが頭打ちになっていました。 ...

2026年3月2日 · 3 分

django-redis: lock

djanog-redis: cache.lock lock https://github.com/jazzband/django-redis/blob/d94a7f9644b96cc37743914fce899cca942c032a/django_redis/cache.py#L177 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class RedisCache(BaseCache): ... @omit_exception def lock(self, *args, **kwargs): return self.client.lock(*args, **kwargs) def __init__(self, server: str, params: Dict[str, Any]) -> None: ... self._client_cls = options.get( "CLIENT_CLASS", "django_redis.client.DefaultClient" ) self._client_cls = import_string(self._client_cls) self._client = None ... DefaultClient https://github.com/jazzband/django-redis/blob/d94a7f9644b96cc37743914fce899cca942c032a/django_redis/client/default.py#L29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from redis import Redis class DefaultClient: def __init__(self, server, params: Dict[str, Any], backend: BaseCache) -> None: ... self._clients: List[Optional[Redis]] = [None] * len(self._server) ... def lock( self, key: KeyT, version: Optional[int] = None, timeout: Optional[float] = None, sleep: float = 0.1, blocking_timeout: Optional[float] = None, client: Optional[Redis] = None, thread_local: bool = True, ): if client is None: client = self.get_client(write=True) key = self.make_key(key, version=version) return client.lock( key, timeout=timeout, sleep=sleep, blocking_timeout=blocking_timeout, thread_local=thread_local, ) def get_client( self, write: bool = True, tried: Optional[List[int]] = None, ) -> Redis: """ Method used for obtain a raw redis client. This function is used by almost all cache backend operations for obtain a native redis client/connection instance. """ index = self.get_next_client_index(write=write, tried=tried) if self._clients[index] is None: self._clients[index] = self.connect(index) return self._clients[index] # type:ignore Redis https://github.com/redis/redis-py/blob/6d77c6d715430c30f22147f8c572659d77380a9f/redis/client.py#L88 lock: https://github.com/redis/redis-py/blob/6d77c6d715430c30f22147f8c572659d77380a9f/redis/client.py#L439C1-L511C10 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): def lock( self, name: str, timeout: Optional[float] = None, sleep: float = 0.1, blocking: bool = True, blocking_timeout: Optional[float] = None, lock_class: Union[None, Any] = None, thread_local: bool = True, ): """ Return a new Lock object using key ``name`` that mimics the behavior of threading.Lock. If specified, ``timeout`` indicates a maximum life for the lock. By default, it will remain locked until release() is called. ``sleep`` indicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock. ``blocking`` indicates whether calling ``acquire`` should block until the lock has been acquired or to fail immediately, causing ``acquire`` to return False and the lock not being acquired. Defaults to True. Note this value can be overridden by passing a ``blocking`` argument to ``acquire``. ``blocking_timeout`` indicates the maximum amount of time in seconds to spend trying to acquire the lock. A value of ``None`` indicates continue trying forever. ``blocking_timeout`` can be specified as a float or integer, both representing the number of seconds to wait. ``lock_class`` forces the specified lock implementation. Note that as of redis-py 3.0, the only lock class we implement is ``Lock`` (which is a Lua-based lock). So, it's unlikely you'll need this parameter, unless you have created your own custom lock class. ``thread_local`` indicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline: time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. thread-1 sets the token to "abc" time: 1, thread-2 blocks trying to acquire `my-lock` using the Lock instance. time: 5, thread-1 has not yet completed. redis expires the lock key. time: 5, thread-2 acquired `my-lock` now that it's available. thread-2 sets the token to "xyz" time: 6, thread-1 finishes its work and calls release(). if the token is *not* stored in thread local storage, then thread-1 would see the token value as "xyz" and would be able to successfully release the thread-2's lock. In some use cases it's necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn't disabled in this case, the worker thread won't see the token set by the thread that acquired the lock. Our assumption is that these cases aren't common and as such default to using thread local storage.""" if lock_class is None: lock_class = Lock return lock_class( self, name, timeout=timeout, sleep=sleep, blocking=blocking, blocking_timeout=blocking_timeout, thread_local=thread_local, )

2024年1月5日 · 4 分

MPTT tree_id

MPTT Tree ID Django + mysql: transaction.atomic() で select_for_update() を使ってレコードをロックしたときのメモ MySQL で採番機能(シーケンス)を実装する方法を整理する MySQL のロックについて公式ドキュメントを読みながら動作検証してみた〜テーブルレベルロック〜 分散ロック Distributed Locking in Django 1 2 3 4 from django.core.cache import cache with cache.lock("somekey"): do_some_thing() redis-py: https://github.com/redis/redis-py/blob/d3a3ada03e080f39144807c9fbe44876c40e0548/redis/client.py#L394 デッドロック Hook available for automatic retry after deadlock in django and mysql setup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import django.db.backends.utils from django.db import OperationalError import time original = django.db.backends.utils.CursorWrapper.execute def execute_wrapper(*args, **kwargs): attempts = 0 while attempts < 3: try: return original(*args, **kwargs) except OperationalError as e: code = e.args[0] if attempts == 2 or code != 1213: raise e attempts += 1 time.sleep(0.2) django.db.backends.utils.CursorWrapper.execute = execute_wrapper

2023年11月6日 · 1 分

Redis キーの作成時刻

Redis キーの登録日時 Redis でキーを登録した時刻を取得するには、残念ながら直接的な方法はありません。Redis はキーの作成日時や更新日時を保存していないからです ¹。ただし、いくつかの回避策があります。 キーを登録するときに、別のキーにタイムスタンプを保存する。 例えば、SET foo barとするときに、SET foo:timestamp 1636106362というように、キーの名前に:timestampを付けて現在の UNIX 時間を値として保存する。 この方法では、キーの数が増えるとメモリの消費量も増えることに注意が必要です。 キーの値にタイムスタンプを含める。 例えば、SET foo bar:1636106362というように、値の末尾に:と現在の UNIX 時間を付ける。 この方法では、値の取得や操作において、タイムスタンプを除去する必要があることに注意が必要です。 キーに有効期限を設定する。 例えば、EXPIRE foo 3600とすると、キーfooは 1 時間後に削除される。 このとき、TTL fooとすると、キーの残りの有効期限が秒単位で返される。 この値から、キーの登録時刻を逆算することができる。この方法では、キーが削除されると時刻を取得できなくなることに注意が必要です。 以上の方法のいずれも完璧ではありませんが、Redis でキーの登録時刻を取得するための可能な解決策として参考にしてください。 ¹: Redis でキーの作成日時や更新日時を取得する方法はありますか? - スタック・オーバーフロー. ソース: Bing との会話 2023/11/6 (1) redis-cli でよく使うコマンド 20 選 #Redis - Qiita. https://qiita.com/hatsu/items/a52817364160e0b6bb60. (2) redis-cli でよく使うコマンド 20 選 #Redis - Qiita. https://qiita.com/hatsu/items/a52817364160e0b6bb60. (3) 全データ型対応の操作 — redis 2.0.3 documentation - shibu.jp. http://redis.shibu.jp/commandreference/alldata.html. (4) 【2022 年最新版】【Redis メモ・3】キーに有効期限を付与し …. https://www.servernote.net/article.cgi?id=redis-note-3. (5) セッション管理として Redis を使用する - RAKUS Developers Blog …. https://tech-blog.rakus.co.jp/entry/2017/10/17/111828. (6) ja.wikipedia.org. https://ja.wikipedia.org/wiki/Redis.

2023年11月6日 · 1 分

redis

Elasic Cache for Redis バージョン 1 2 3 4 5 $ redis-cli --version redis-cli 5.0.7 $ redis-server -v Redis server v=5.0.7 sha=00000000:0 malloc=jemalloc-5.2.1 bits=64 build=66bd629f924ac924 接続: 1 REDIS_URL=redis://mycloud-stage-redis-nc.sigrvp.ng.0001.apne1.cache.amazonaws.com:6379/0 1 2 3 $ export $(cat .env|xargs) $ redis-cli -u $REDIS_URL PING PONG Python redis-py: https://github.com/redis/redis-py django-redis: https://github.com/jazzband/django-redis channels_redis: https://github.com/django/channels_redis クラスターモードへの接続 Cluster Client 1 2 3 4 5 6 7 8 9 import logging from redis.cluster import RedisCluster ENDPOINT = "mycloud-stage-redis-test.sigrvp.clustercfg.apne1.cache.amazonaws.com" logging.basicConfig(level=logging.INFO) redis = RedisCluster(host=ENDPOINT, port="6379") if redis.ping(): logging.info("Connected to Redis") django-redis: ...

2023年7月14日 · 1 分