これは Django Advent Calendar 2020 - Qiita 兼 JSL(日本システム技研) Advent Calendar 2020 - Qiita の12/16分の記事です。
Djangoと共通テーブル式(Common Table Expression、CTE)を用いた再帰クエリを使って、階層構造を持つテーブルからデータを取得したいことがありました。
ただ、現在のDjangoでは「共通テーブル式の再帰クエリ」がサポートされていません。
#28919 (Add support for Common Table Expression (CTE) queries) – Django
生SQLを書いても良いのですが、IDEのサポートがほしかったのでライブラリを探したところ、 django-cte
がありました。
dimagi/django-cte: Common Table Expressions (CTE) for Django
そこで、django-cteと共通テーブル式を用いた再帰クエリを使った時のメモを残します。
目次
環境
そもそもやりたかったこと
リンゴの親子関係という階層構造を持つデータがあり、RDBに階層構造を保持したいとします*1*2。
. ├── 東光 │ └── 千秋 │ ├── シナノゴールド │ │ └── 奥州ロマン │ └── 秋映 └── 国光 └── フジ └── シナノスイート
また、この階層構造の途中のデータを取得すると、その祖先のデータもすべて取得したいとします。
例えば、「シナノゴールド」を指定すると、祖先の「千秋」「東光」も取得したいとします。
どうやって実現するか
階層構造をRDBに保持する方法としては、書籍
- プログラマのためのSQLグラフ原論 リレーショナルデータベースで木と階層構造を扱うために(ジョー・セルコ ミック ミック)|翔泳社の本
- O'Reilly Japan - SQLアンチパターンの2章「素朴な木(ナイーブツリー)」
にていくつか言及があります。
ただ、今回やりたいことは比較的単純な階層構造であることに加え、SQLグラフ原論にて
と書かれていることから、隣接リストモデルにて表現します。
また、隣接リストからの取り出しについては、SQLアンチパターンに
また、隣接リストに格納された階層構造をサポートするSQL拡張機能を備えているデータベース製品もあります。SQL-99標準では、WITHキーワードの後に共通テーブル式(Common Table Expression: CTE) を指定する形式の再帰クエリ構文を定義しています。
共通テーブル式を用いた再帰クエリは、Microsoft SQL Server 2005、Oracle Database 11g、IBM DB2、MySQL 8.0、PostgreSQL 8.4、SQLite 3.8.3、Firebird 2.1 でサポートされています
とあります。
Djangoが公式サポートしているRDBは、いずれも上記に含まれています。
Databases | Django documentation | Django
そこで今回は、共通テーブル式を用いた再帰クエリにて実装することとします。
Djangoでの実装
モデル
今回は
という構造とします。
なお、外部キー「親のサロゲートキー」では自己参照となりますが、Djangoでは ForeignKey
に self
を渡すことで可能になります。
https://docs.djangoproject.com/ja/3.1/ref/models/fields/#foreignkey
from django.db import models class Apple(models.Model): name = models.CharField('名前', max_length=20) parent = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True) class Meta: db_table = 'apple'
このモデルの中身は以下を想定しています。
id | name | parent |
---|---|---|
1 | 東光 | NULL |
2 | 千秋 | 1 |
3 | シナノゴールド | 2 |
4 | 奥州ロマン | 3 |
5 | 秋映 | 2 |
6 | 国光 | NULL |
7 | フジ | 6 |
8 | シナノスイート | 7 |
なお、親へさかのぼれないリンゴは、parent
に NULL
を設定しています。
NULLを使ったのは、書籍「プログラマのためのSQLグラフ原論」のp24にも「最もよくある表現」と書かれていたためです。
もし他の値を設定したい場合は、同書の同ページにて言及されています。
共通テーブル式を用いた再帰クエリの書き方
モデルができたので、次は取得するクエリを作成します。
まずは、Djangoで生SQL版を実装する前に、共通テーブル式を用いた再帰クエリの書き方を見ていきます。
共通テーブル式はSQL99に含まれます。
新しい業界標準「SQL99」詳細解説
イメージ的にはこんな感じです。
WITH RECURSIVE <table> (<field>, ...) /* 集めたデータを入れるテーブルとその項目 */ AS ( /* 起点となるレコードを抽出する箇所 */ UNION ALL /* 再帰してレコードを抽出する箇所 */ ) SELECT * FROM <table>; /* 集めたデータに対する処理 */
Djangoの生SQLで抽出
上記SQLイメージを元に、Djangoでの生SQL版を実装します。
集めたデータを入れるテーブルとその項目
共通テーブル名として、今回は tree
としました。
また項目については、Appleモデルの id
と name
、 parent_id
を用意します *3。
それに加え、何階層さかのぼっているのかを確認するための項目 node
を用意します*4。
WITH RECURSIVE tree
(node, id, name, parent_id)
起点となるレコードを抽出する箇所
起点となるレコードを特定するため、WHERE句を用意したSQLになります。
SELECT 0 AS node, base.id, base.name, base.parent_id FROM apple AS base WHERE base.id = %s
ここでは起点となるレコードなので、 node
には 0
という固定値を設定します。
また、別の箇所で apple テーブルからの抽出を行うため、 AS
で別名を付けておきます。
他に、WHERE句に %s
でプレースホルダーを用意します。
https://docs.djangoproject.com/ja/3.1/topics/db/sql/#passing-parameters-into-raw
再帰してレコードを抽出する箇所
起点と UNION ALL
するSQLになります。
自身の id と共通テーブルの parent_id で INNER JOIN します。
SELECT tree.node + 1 AS node, apple.id, apple.name, apple.parent_id FROM apple INNER JOIN tree ON apple.id = tree.parent_id
集めたデータに対する処理
こちらの普通のSELECTです。
SELECT * FROM tree ORDER BY node;
SQLの全体像
こんな感じになりました。
WITH RECURSIVE tree (node, id, name, parent_id) AS ( SELECT 0 AS node, base.id, base.name, base.parent_id FROM apple AS base WHERE base.id = %s UNION ALL SELECT tree.node + 1 AS node, apple.id, apple.name, apple.parent_id FROM apple INNER JOIN tree ON apple.id = tree.parent_id ) SELECT * FROM tree ORDER BY node;
動作確認
のちほど django-cte版も同じになるか確認するため、以下のようなassertするヘルパメソッドを用意します。
def assertCte(self, actual): # 件数 self.assertEqual(len(actual), 3) # シナノゴールド自身があること own = actual[0] self.assertEqual(own.node, 0) self.assertEqual(own.name, 'シナノゴールド') # シナノゴールドの親(千秋) parent = actual[1] self.assertEqual(parent.node, 1) self.assertEqual(parent.name, '千秋') # 千秋の親(東光) grandparent = actual[2] self.assertEqual(grandparent.node, 2) self.assertEqual(grandparent.name, '東光')
その後、ヘルパメソッドを使ったテストコードを書いたところ、テストがパスしました。
class TestRecursive(TestCase): def test_1_raw_sql(self): shinano_gold = Apple.objects.get(name='シナノゴールド') apples = Apple.objects.raw( """ WITH RECURSIVE tree (node, id, name, parent_id) AS ( SELECT 0 AS node, base.id, base.name, base.parent_id FROM apple AS base WHERE base.id = %s UNION ALL SELECT tree.node + 1 AS node, apple.id, apple.name, apple.parent_id FROM apple INNER JOIN tree ON apple.id = tree.parent_id ) SELECT * FROM tree ORDER BY node; """ , [shinano_gold.pk]) self.assertCte(apples)
django-cteでの抽出
次に、django-cte での抽出を試します。
インストール
pipでインストールします。
pip install django-cte
モデルの objects を差し替え
次に、モデルの objects
を CTEManager
へ差し替えます。
from django_cte import CTEManager class Apple(models.Model): ... objects = CTEManager() # 追加 class Meta: db_table = 'apple'
django-cteのクエリ全体像
READMEの Recursive Common Table Expressions
と、生SQLの書き方を見比べると以下のようでした。
# WITH RECURSIVE ... AS() を関数化 def make_regions_cte(cte): return Region.objects.filter( ... ).union( ... all=True, ) # Withに割り当て cte = With.recursive(make_regions_cte) # 共通テーブルからの抽出 regions = (...)
そこで、コメントしたそれぞれの機能を実装していきます。
WITH RECURSIVE ... AS() を関数化
With.recursive()
に渡す部分を関数化します。
recursive()関数のソースコードを読むと
:param make_cte_queryset: Function taking a single argument (a not-yet-fully-constructed cte object) and returning a `QuerySet` object. The returned `QuerySet` normally consists of an initial statement unioned with a recursive statement. https://github.com/dimagi/django-cte/blob/fede416338ec0c5a967e2f1f902435061ae630e1/django_cte/cte.py#L42
とあったため、QuerySetではなくQuerySetを返す関数を用意すれば良さそうです。
まずは起点となる部分の絞り込みを作ります。
shinano_gold = Apple.objects.get(name='シナノゴールド') Apple.objects.filter( id=shinano_gold.pk )
続いて、annotate()
メソッドを使って、さかのぼり番号 node
を追加します。
https://docs.djangoproject.com/ja/3.1/ref/models/querysets/#annotate
Apple.objects.filter( id=shinano_gold.pk # 追加 ).annotate( node=Value(0, output_field=IntegerField()),
最後に UNION ALL 後の部分を追加します。
Apple.objects.filter( id=shinano_gold.pk ).annotate( node=Value(0, output_field=IntegerField()), # 追加 ).union( cte.join(Apple, id=cte.col.parent_id) .annotate(node=cte.col.node + Value(1, output_field=IntegerField())), all=True, )
あとはこのQuerySetを返せば関数が完成します。
With.recursiveに割り当て
関数を引数として渡すだけです。
cte = With.recursive(make_cte)
共通テーブルからの抽出
READMEの場合 With
オブジェクトの join()
メソッドを使っています。
しかし、今回の場合はWITH RECURSIVE ... AS() を関数化したところで UNION ALL しているため、これ以上のJOINは不要です。
そのため、Withオブジェクトの queryset()
メソッドで、ここまで処理してきたQuerySetを取り出し、共通テーブルの処理へとつなげます。
https://github.com/dimagi/django-cte/blob/fede416338ec0c5a967e2f1f902435061ae630e1/django_cte/cte.py#L92
apples = (
cte.queryset()
.with_cte(cte)
.annotate(node=cte.col.node)
.order_by('node')
)
全体像
django-cte版の全体像はこんな感じです。
def make_cte(cte): shinano_gold = Apple.objects.get(name='シナノゴールド') return Apple.objects.filter( id=shinano_gold.pk ).annotate( node=Value(0, output_field=IntegerField()), ).union( cte.join(Apple, id=cte.col.parent_id) .annotate(node=cte.col.node + Value(1, output_field=IntegerField())), all=True, ) cte = With.recursive(make_cte) apples = ( cte.queryset() .with_cte(cte) .annotate(node=cte.col.node) .order_by('node') )
動作確認
同じように、自作のヘルパメソッド self.assertCte(apples)
を使ってテストコードで動作確認したところ、想定通りの動きとなりました。
また、発行されるSQLも想定通りでした。
WITH RECURSIVE cte AS( SELECT "apple"."id", "apple"."name", "apple"."parent_id", 0 AS "node" FROM "apple" WHERE "apple"."id" = 3 UNION ALL SELECT "apple"."id", "apple"."name", "apple"."parent_id", ("cte"."node" + 1) AS "node" FROM "apple" INNER JOIN "cte" ON "apple"."id" = "cte"."parent_id" ) SELECT "cte"."id", "cte"."name", "cte"."parent_id", "cte"."node" AS "node" FROM "cte" ORDER BY "node" ASC
その他
django-cteでルートのデータを取得する
今回の場合で言えば、「国光」のみを取得したいとなります。
とはいえ、書き方は上記の場合と変わらず、最初の Apple.objects.filter()
の条件が異なるのみです。
以下のテストもパスします。
def test_3_django_cte_root(self): def make_cte(cte): kokko = Apple.objects.get(name='国光') return Apple.objects.filter( id=kokko.pk ).annotate( node=Value(0, output_field=IntegerField()), ).union( cte.join(Apple, id=cte.col.parent_id) .annotate(node=cte.col.node + Value(1, output_field=IntegerField())), all=True, ) cte = With.recursive(make_cte) apples = ( cte.queryset() .with_cte(cte) .annotate(node=cte.col.node) .order_by('node') ) self.assertEqual(len(apples), 1) apple = apples.get() self.assertEqual(apple.node, 0) self.assertEqual(apple.name, '国光')
発行されるSQLも同じです。
WITH RECURSIVE cte AS( SELECT "apple"."id", "apple"."name", "apple"."parent_id", 0 AS "node" FROM "apple" WHERE "apple"."id" = 6 UNION ALL SELECT "apple"."id", "apple"."name", "apple"."parent_id", ("cte"."node" + 1) AS "node" FROM "apple" INNER JOIN "cte" ON "apple"."id" = "cte"."parent_id" ) SELECT "cte"."id", "cte"."name", "cte"."parent_id", "cte"."node" AS "node" FROM "cte" ORDER BY "node" ASC
django-cteの戻り値をdictとして取得したい
WITH RECURSIVE ... AS() を関数化した時の関数の中で、 values() メソッドを使います。
def make_cte(cte): shinano_gold = Apple.objects.get(name='シナノゴールド') return Apple.objects.filter( id=shinano_gold.pk # ここで values() ).values( 'id', 'parent', 'name', node=Value(0, output_field=IntegerField()), ).union( cte.join(Apple, id=cte.col.parent_id) # こちらもvalues() .values( 'id', 'parent', 'name', node=cte.col.node + Value(1, output_field=IntegerField())), all=True, )
dictなのでテストコードが少し変わります。
self.assertEqual(len(apples), 3) # シナノゴールド自身があること own = apples[0] self.assertEqual(own['node'], 0) self.assertEqual(own['name'], 'シナノゴールド') # シナノゴールドの親(千秋) own = apples[1] self.assertEqual(own['node'], 1) self.assertEqual(own['name'], '千秋') # 千秋の親(東光) own = apples[2] self.assertEqual(own['node'], 2) self.assertEqual(own['name'], '東光')
ソースコード
Githubに上げました。
https://github.com/thinkAmi-sandbox/django_cte-sample
*1:リンゴには「種子親」と「花粉親」がありますが、わかりやすくするため今回は種子親のみの階層構造とします。
*2:実際のところ、東光は「ゴールデンデリシャス x 印度」の交配で生まれていますが、わかりやすくするためそれ以上の親はさかのぼらないとします。参考: 東光 - 青森県の市販のりんごと話題のりんご
*3:生SQLなので、モデルのフィールド名「parent」ではなく、実際のテーブル列名「parent_id」を指定します
*4:depthという名前の方が良いのかもしれませんが、RDBによっては使われる名前であることと、はてなブログでシンタックスハイライトされてしまったため、「node」としました