django-redis: lock

djanog-redis: cache.lock lock https://github.com/jazzband/django-redis/blob/d94a7f9644b96cc37743914fce899cca942c032a/django_redis/cache.py#L177 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class RedisCache(BaseCache): ... @omit_exception def lock(self, *args, **kwargs): return self.client.lock(*args, **kwargs) def __init__(self, server: str, params: Dict[str, Any]) -> None: ... self._client_cls = options.get( "CLIENT_CLASS", "django_redis.client.DefaultClient" ) self._client_cls = import_string(self._client_cls) self._client = None ... DefaultClient https://github.com/jazzband/django-redis/blob/d94a7f9644b96cc37743914fce899cca942c032a/django_redis/client/default.py#L29 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from redis import Redis class DefaultClient: def __init__(self, server, params: Dict[str, Any], backend: BaseCache) -> None: ... self._clients: List[Optional[Redis]] = [None] * len(self._server) ... def lock( self, key: KeyT, version: Optional[int] = None, timeout: Optional[float] = None, sleep: float = 0.1, blocking_timeout: Optional[float] = None, client: Optional[Redis] = None, thread_local: bool = True, ): if client is None: client = self.get_client(write=True) key = self.make_key(key, version=version) return client.lock( key, timeout=timeout, sleep=sleep, blocking_timeout=blocking_timeout, thread_local=thread_local, ) def get_client( self, write: bool = True, tried: Optional[List[int]] = None, ) -> Redis: """ Method used for obtain a raw redis client. This function is used by almost all cache backend operations for obtain a native redis client/connection instance. """ index = self.get_next_client_index(write=write, tried=tried) if self._clients[index] is None: self._clients[index] = self.connect(index) return self._clients[index] # type:ignore Redis https://github.com/redis/redis-py/blob/6d77c6d715430c30f22147f8c572659d77380a9f/redis/client.py#L88 lock: https://github.com/redis/redis-py/blob/6d77c6d715430c30f22147f8c572659d77380a9f/redis/client.py#L439C1-L511C10 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): def lock( self, name: str, timeout: Optional[float] = None, sleep: float = 0.1, blocking: bool = True, blocking_timeout: Optional[float] = None, lock_class: Union[None, Any] = None, thread_local: bool = True, ): """ Return a new Lock object using key ``name`` that mimics the behavior of threading.Lock. If specified, ``timeout`` indicates a maximum life for the lock. By default, it will remain locked until release() is called. ``sleep`` indicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock. ``blocking`` indicates whether calling ``acquire`` should block until the lock has been acquired or to fail immediately, causing ``acquire`` to return False and the lock not being acquired. Defaults to True. Note this value can be overridden by passing a ``blocking`` argument to ``acquire``. ``blocking_timeout`` indicates the maximum amount of time in seconds to spend trying to acquire the lock. A value of ``None`` indicates continue trying forever. ``blocking_timeout`` can be specified as a float or integer, both representing the number of seconds to wait. ``lock_class`` forces the specified lock implementation. Note that as of redis-py 3.0, the only lock class we implement is ``Lock`` (which is a Lua-based lock). So, it's unlikely you'll need this parameter, unless you have created your own custom lock class. ``thread_local`` indicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline: time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. thread-1 sets the token to "abc" time: 1, thread-2 blocks trying to acquire `my-lock` using the Lock instance. time: 5, thread-1 has not yet completed. redis expires the lock key. time: 5, thread-2 acquired `my-lock` now that it's available. thread-2 sets the token to "xyz" time: 6, thread-1 finishes its work and calls release(). if the token is *not* stored in thread local storage, then thread-1 would see the token value as "xyz" and would be able to successfully release the thread-2's lock. In some use cases it's necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn't disabled in this case, the worker thread won't see the token set by the thread that acquired the lock. Our assumption is that these cases aren't common and as such default to using thread local storage.""" if lock_class is None: lock_class = Lock return lock_class( self, name, timeout=timeout, sleep=sleep, blocking=blocking, blocking_timeout=blocking_timeout, thread_local=thread_local, )

2024年1月5日 · 4 分

「この接続ではプライバシーが保護されません」

「この接続ではプライバシーが保護されません」 【18 の方法】「この接続ではプライバシーが保護されません」エラーを解決するには SSL TEST https://www.ssllabs.com/ssltest/index.html DNS CAA DNS CAA (Certification Authority Authorization)利用が標準化 Route 53 が CAA レコードに対応しました!

2023年12月13日 · 1 分

Python with文

With 【Python】with 文の構造を理解する

2023年11月9日 · 1 分

MPTT tree_id

MPTT Tree ID Django + mysql: transaction.atomic() で select_for_update() を使ってレコードをロックしたときのメモ MySQL で採番機能(シーケンス)を実装する方法を整理する MySQL のロックについて公式ドキュメントを読みながら動作検証してみた〜テーブルレベルロック〜 分散ロック Distributed Locking in Django 1 2 3 4 from django.core.cache import cache with cache.lock("somekey"): do_some_thing() redis-py: https://github.com/redis/redis-py/blob/d3a3ada03e080f39144807c9fbe44876c40e0548/redis/client.py#L394 デッドロック Hook available for automatic retry after deadlock in django and mysql setup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import django.db.backends.utils from django.db import OperationalError import time original = django.db.backends.utils.CursorWrapper.execute def execute_wrapper(*args, **kwargs): attempts = 0 while attempts < 3: try: return original(*args, **kwargs) except OperationalError as e: code = e.args[0] if attempts == 2 or code != 1213: raise e attempts += 1 time.sleep(0.2) django.db.backends.utils.CursorWrapper.execute = execute_wrapper

2023年11月6日 · 1 分

utf8 -> cp932

UTF8 -> CP932 【pandas】csv 出力時に cp932 を指定すると UnicodeError が起こる場合の対処法 [Python3] Shift_JIS と UTF-8 と ASCII を行き来する コーデック: エラーハンドラ UTF-8 → cp932(Shift_JIS)変換表

2023年10月11日 · 1 分

Django 動的モデル生成

Django : 動的モデル生成 定義されたメタ情報で同じテーブルを複数作る 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 from pathlib import Path import pandas as pd from django.conf import settings from django.db import connection, models def get_meta(name): """ 定義情報(CSV)ファイル """ path = META_DIR / f"{name}.map.csv" return pd.read_csv(path, dtype="str").fillna("").to_dict(orient="records") def named_model(name, code): meta = get_meta(name) class_name = f"{name}{code}" table_name = f"{name}_{code}" unique_fields = list(map(lambda i: i["column"], filter(lambda i: i["unique"] == "1", meta))) class Meta: db_table = f"oracle_{table_name}" constraints = [ models.UniqueConstraint( fields=unique_fields, name=f"uniq_{table_name}", ), ] def _field(item): params = {"db_index": True} if item["db_index"] else {} params["blank"] = True return ( item["column"], models.CharField(verbose_name=item["label"], max_length=100, null=True, default=None, **params), ) fields = dict(map(_field, meta)) attrs = {"__module__": "oracle.models", "Meta": Meta, **fields} models_class = type(class_name, (models.Model,), attrs) return models_class def get_named_model(name, code): # モデルクラス取得 model_class = named_model(name, code) if model_class._meta.db_table in connection.introspection.table_names(): # テーブルが存在していたら終了 return model_class # テーブルの作成 with connection.schema_editor() as schema_editor: schema_editor.create_model(model_class) return model_class

2023年8月16日 · 1 分

Django Migration: Uniqu Field

Django : マイグレーション: Unique フィールドの追加 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 # Generated by Django 3.2.20 on 2023-07-24 06:36 from django.db import migrations, models def advise_fill_advise_code(apps, schema_editor): """ instance.id => instance.advise_code と初期値にする""" Model = apps.get_model("studies", "Advise") for instance in Model.objects.all(): instance.advise_code = str(instance.id) instance.save() class Migration(migrations.Migration): dependencies = [ ("studies", "0015_auto_20211108_1814"), ] operations = [ migrations.AddField( model_name="advise", name="advise_code", field=models.CharField( # オリジナル: # help_text="(id)", max_length=6, unique=True, verbose_name="Advise Code" # 修正: null=True default=None とする help_text="(id)", max_length=6, null=True, default=None, unique=True, verbose_name="Advise Code" ), preserve_default=False, ), # 追加: advise_code (ユニーク) を設定する migrations.RunPython(inspadvise_fill_advise_code, reverse_code=migrations.RunPython.noop), # 追加: null=True default=None を抜く migrations.AlterField( model_name="advise", name="advise_code", field=models.CharField(help_text="(id)", max_length=6, unique=True, verbose_name="Advise Code"), preserve_default=False, ), ]

2023年7月24日 · 1 分

unittest mock

patch.object(): メソッドを置き換える メソッドが外部のサーバーに対して API リクエストを行う テストではサーバー間 API が成功したものとして、状態だけ進める 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from unittest.mock import patch from applies.utils import approve_to from base.tests.utils import RestTestCase from sourcing.models import PurchaseOrder def fake_cloudsign_post_document(self, *args, **kwargs): # サーバー間のAPIが成功したものとして承認状態を進める approve_to(self.purchase, "purchase_print") class PurchaseOrderTest(RestTestCase): @patch.object(PurchaseOrder, "cloudsign_post_document", new=fake_cloudsign_post_document) def test_create(self): # .... # response = client.post("/api/rest/sourcing/purchaseorder/", params) ... instance = PurchaseOrder.objects.get(id=response.json()["id]) self.assertEqual(instanc.purchase.purchase_status, "print")

2023年6月26日 · 1 分

django cache decorator

django: cache value decorator 1 2 from functools import wraps from django.core.cache import c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def cached_value(key=None, **kwargs): """ key で指定された値をキャッシュから取得して current に渡す reset=True が指定されるとキャッシュデータを使わない """ def _cache_controller(original_func): @wraps(original_func) def _cache_controlled(*args, **kw): current = None if kw.get("reset", False) else cache.get(key) lastest = original_func(*args, current=current, **kw) cache.set(key, lastest) return lastest return _cache_controlled return _cache_controller 1 2 3 4 5 class PurchaseOrder: @cached_value(key="webservice_accesstoken") def get_accesstoken(self, current=None, reset=False): return Token(self.company.credentials).update_token(current)

2023年6月22日 · 1 分

at

at コマンド MacOS で at コマンドを有効化して使ってみる 手順: /System/Library/LaunchDaemons/com.apple.atrun.plist で、Disabled を false に変更 /System/Library/LaunchDaemons/com.apple.atrun.plist を再ロード 1 2 3 4 % sudo launchctl load -w /System/Library/LaunchDaemons/com.apple.atrun.plist /System/Library/LaunchDaemons/com.apple.atrun.plist: service already loaded Load failed: 37: Operation already in progress

2023年6月18日 · 1 分