これは Django Advent Calendar 2020 - Qiita 兼 JSL(日本システム技研) Advent Calendar 2020 - Qiita の12/16分の記事です。
Djangoと共通テーブル式(Common Table Expression、CTE)を用いた再帰クエリを使って、階層構造を持つテーブルからデータを取得したいことがありました。
ただ、現在のDjangoでは「共通テーブル式の再帰クエリ」がサポートされていません。
#28919 (Add support for Common Table Expression (CTE) queries) – Django
生SQLを書いても良いのですが、IDEのサポートがほしかったのでライブラリを探したところ、 django-cte
がありました。
dimagi/django-cte: Common Table Expressions (CTE) for Django
そこで、django-cteと共通テーブル式を用いた再帰クエリを使った時のメモを残します。
目次
環境
そもそもやりたかったこと
リンゴの親子関係という階層構造を持つデータがあり、RDBに階層構造を保持したいとします*1*2。
.
├── 東光
│ └── 千秋
│ ├── シナノゴールド
│ │ └── 奥州ロマン
│ └── 秋映
└── 国光
└── フジ
└── シナノスイート
また、この階層構造の途中のデータを取得すると、その祖先のデータもすべて取得したいとします。
例えば、「シナノゴールド」を指定すると、祖先の「千秋」「東光」も取得したいとします。
どうやって実現するか
階層構造をRDBに保持する方法としては、書籍
にていくつか言及があります。
ただ、今回やりたいことは比較的単純な階層構造であることに加え、SQLグラフ原論にて
RDB/SQLで階層構造を表現するメジャーな手段は、現在においても隣接リストモデルであるのは、動かしがたい事実
プログラマのためのSQLグラフ原論(初版第1刷) 付録 訳者による解説 (ミック) p311
と書かれていることから、隣接リストモデルにて表現します。
また、隣接リストからの取り出しについては、SQLアンチパターンに
また、隣接リストに格納された階層構造をサポートするsQL拡張機能を備えているデータベース製品もあります。SQL-99標準では、WITHキーワードの後に共通テーブル式(Common Table Expression: CTE) を指定する形式の再帰クエリ構文を定義しています。
共通テーブル式を用いた再帰クエリは、Microsoft SQL Server 2005、Oracle Database 11g、IBM DB2、MySQL 8.0、PostgreSQL 8.4、SQLite 3.8.3、Firebird 2.1 でサポートされています
SQLアンチパターン(初版第10刷) 2章ナイーブツリー(素朴な木) p19
とあります。
Djangoが公式サポートしているRDBは、いずれも上記に含まれています。
Databases | Django documentation | Django
そこで今回は、共通テーブル式を用いた再帰クエリにて実装することとします。
モデル
RDBに保存するため、Djangoのモデルを定義します。
今回は
という構造とします。
なお、外部キー「親のサロゲートキー」では自己参照となりますが、Djangoでは ForeignKey
に self
を渡すことで可能になります。
https://docs.djangoproject.com/ja/3.1/ref/models/fields/#foreignkey
from django.db import models
class Apple(models.Model):
name = models.CharField('名前', max_length=20)
parent = models.ForeignKey('self',
on_delete=models.SET_NULL,
null=True,
blank=True)
class Meta:
db_table = 'apple'
このモデルの中身は以下を想定しています。
id |
name |
parent |
1 |
東光 |
NULL |
2 |
千秋 |
1 |
3 |
シナノゴールド |
2 |
4 |
奥州ロマン |
3 |
5 |
秋映 |
2 |
6 |
国光 |
NULL |
7 |
フジ |
6 |
8 |
シナノスイート |
7 |
なお、親へさかのぼれないリンゴは、parent
に NULL
を設定しています。
NULLを使ったのは、書籍「プログラマのためのSQLグラフ原論」のp24にも「最もよくある表現」と書かれていたためです。
もし他の値を設定したい場合は、同書の同ページにて言及されています。
モデルができたので、次は取得するクエリを作成します。
まずは、Djangoで生SQL版を実装する前に、共通テーブル式を用いた再帰クエリの書き方を見ていきます。
共通テーブル式はSQL99に含まれます。
新しい業界標準「SQL99」詳細解説
イメージ的にはこんな感じです。
WITH RECURSIVE <table> (<field>, ...)
AS (
UNION ALL
)
SELECT * FROM <table>;
上記SQLイメージを元に、Djangoでの生SQL版を実装します。
集めたデータを入れるテーブルとその項目
共通テーブル名として、今回は tree
としました。
また項目については、Appleモデルの id
と name
、 parent_id
を用意します *3。
それに加え、何階層さかのぼっているのかを確認するための項目 node
を用意します*4。
WITH RECURSIVE tree
(node, id, name, parent_id)
起点となるレコードを抽出する箇所
起点となるレコードを特定するため、WHERE句を用意したSQLになります。
SELECT 0 AS node, base.id, base.name, base.parent_id
FROM apple AS base
WHERE base.id = %s
ここでは起点となるレコードなので、 node
には 0
という固定値を設定します。
また、別の箇所で apple テーブルからの抽出を行うため、 AS
で別名を付けておきます。
他に、WHERE句に %s
でプレースホルダーを用意します。
https://docs.djangoproject.com/ja/3.1/topics/db/sql/#passing-parameters-into-raw
再帰してレコードを抽出する箇所
起点と UNION ALL
するSQLになります。
自身の id と共通テーブルの parent_id で INNER JOIN します。
SELECT tree.node + 1 AS node,
apple.id,
apple.name,
apple.parent_id
FROM apple
INNER JOIN tree
ON apple.id = tree.parent_id
集めたデータに対する処理
こちらの普通のSELECTです。
SELECT *
FROM tree
ORDER BY node;
こんな感じになりました。
WITH RECURSIVE tree
(node, id, name, parent_id)
AS (
SELECT 0 AS node, base.id, base.name, base.parent_id
FROM apple AS base
WHERE base.id = %s
UNION ALL
SELECT tree.node + 1 AS node,
apple.id,
apple.name,
apple.parent_id
FROM apple
INNER JOIN tree
ON apple.id = tree.parent_id
) SELECT *
FROM tree
ORDER BY node;
動作確認
のちほど django-cte版も同じになるか確認するため、以下のようなassertするヘルパメソッドを用意します。
def assertCte(self, actual):
self.assertEqual(len(actual), 3)
own = actual[0]
self.assertEqual(own.node, 0)
self.assertEqual(own.name, 'シナノゴールド')
parent = actual[1]
self.assertEqual(parent.node, 1)
self.assertEqual(parent.name, '千秋')
grandparent = actual[2]
self.assertEqual(grandparent.node, 2)
self.assertEqual(grandparent.name, '東光')
その後、ヘルパメソッドを使ったテストコードを書いたところ、テストがパスしました。
class TestRecursive(TestCase):
def test_1_raw_sql(self):
shinano_gold = Apple.objects.get(name='シナノゴールド')
apples = Apple.objects.raw(
"""
WITH RECURSIVE tree
(node, id, name, parent_id)
AS (
SELECT 0 AS node, base.id, base.name, base.parent_id
FROM apple AS base
WHERE base.id = %s
UNION ALL
SELECT tree.node + 1 AS node,
apple.id,
apple.name,
apple.parent_id
FROM apple
INNER JOIN tree
ON apple.id = tree.parent_id
) SELECT *
FROM tree
ORDER BY node;
"""
, [shinano_gold.pk])
self.assertCte(apples)
次に、django-cte での抽出を試します。
インストール
pipでインストールします。
pip install django-cte
モデルの objects を差し替え
次に、モデルの objects
を CTEManager
へ差し替えます。
from django_cte import CTEManager
class Apple(models.Model):
...
objects = CTEManager()
class Meta:
db_table = 'apple'
READMEの Recursive Common Table Expressions
と、生SQLの書き方を見比べると以下のようでした。
def make_regions_cte(cte):
return Region.objects.filter(
...
).union(
...
all=True,
)
cte = With.recursive(make_regions_cte)
regions = (...)
そこで、コメントしたそれぞれの機能を実装していきます。
WITH RECURSIVE ... AS() を関数化
With.recursive()
に渡す部分を関数化します。
recursive()関数のソースコードを読むと
:param make_cte_queryset: Function taking a single argument (a
not-yet-fully-constructed cte object) and returning a `QuerySet`
object. The returned `QuerySet` normally consists of an initial
statement unioned with a recursive statement.
https://github.com/dimagi/django-cte/blob/fede416338ec0c5a967e2f1f902435061ae630e1/django_cte/cte.py#L42
とあったため、QuerySetではなくQuerySetを返す関数を用意すれば良さそうです。
まずは起点となる部分の絞り込みを作ります。
shinano_gold = Apple.objects.get(name='シナノゴールド')
Apple.objects.filter(
id=shinano_gold.pk
)
続いて、annotate()
メソッドを使って、さかのぼり番号 node
を追加します。
https://docs.djangoproject.com/ja/3.1/ref/models/querysets/#annotate
Apple.objects.filter(
id=shinano_gold.pk
).annotate(
node=Value(0, output_field=IntegerField()),
最後に UNION ALL 後の部分を追加します。
Apple.objects.filter(
id=shinano_gold.pk
).annotate(
node=Value(0, output_field=IntegerField()),
).union(
cte.join(Apple, id=cte.col.parent_id)
.annotate(node=cte.col.node + Value(1, output_field=IntegerField())),
all=True,
)
あとはこのQuerySetを返せば関数が完成します。
With.recursiveに割り当て
関数を引数として渡すだけです。
cte = With.recursive(make_cte)
共通テーブルからの抽出
READMEの場合 With
オブジェクトの join()
メソッドを使っています。
しかし、今回の場合はWITH RECURSIVE ... AS() を関数化したところで UNION ALL しているため、これ以上のJOINは不要です。
そのため、Withオブジェクトの queryset()
メソッドで、ここまで処理してきたQuerySetを取り出し、共通テーブルの処理へとつなげます。
https://github.com/dimagi/django-cte/blob/fede416338ec0c5a967e2f1f902435061ae630e1/django_cte/cte.py#L92
apples = (
cte.queryset()
.with_cte(cte)
.annotate(node=cte.col.node)
.order_by('node')
)
全体像
django-cte版の全体像はこんな感じです。
def make_cte(cte):
shinano_gold = Apple.objects.get(name='シナノゴールド')
return Apple.objects.filter(
id=shinano_gold.pk
).annotate(
node=Value(0, output_field=IntegerField()),
).union(
cte.join(Apple, id=cte.col.parent_id)
.annotate(node=cte.col.node + Value(1, output_field=IntegerField())),
all=True,
)
cte = With.recursive(make_cte)
apples = (
cte.queryset()
.with_cte(cte)
.annotate(node=cte.col.node)
.order_by('node')
)
動作確認
同じように、自作のヘルパメソッド self.assertCte(apples)
を使ってテストコードで動作確認したところ、想定通りの動きとなりました。
また、発行されるSQLも想定通りでした。
WITH RECURSIVE cte AS(
SELECT
"apple"."id",
"apple"."name",
"apple"."parent_id",
0 AS "node"
FROM
"apple"
WHERE
"apple"."id" = 3
UNION ALL
SELECT
"apple"."id",
"apple"."name",
"apple"."parent_id",
("cte"."node" + 1) AS "node"
FROM
"apple"
INNER JOIN
"cte"
ON "apple"."id" = "cte"."parent_id"
)
SELECT
"cte"."id",
"cte"."name",
"cte"."parent_id",
"cte"."node" AS "node"
FROM
"cte"
ORDER BY
"node" ASC
その他
django-cteでルートのデータを取得する
今回の場合で言えば、「国光」のみを取得したいとなります。
とはいえ、書き方は上記の場合と変わらず、最初の Apple.objects.filter()
の条件が異なるのみです。
以下のテストもパスします。
def test_3_django_cte_root(self):
def make_cte(cte):
kokko = Apple.objects.get(name='国光')
return Apple.objects.filter(
id=kokko.pk
).annotate(
node=Value(0, output_field=IntegerField()),
).union(
cte.join(Apple, id=cte.col.parent_id)
.annotate(node=cte.col.node + Value(1, output_field=IntegerField())),
all=True,
)
cte = With.recursive(make_cte)
apples = (
cte.queryset()
.with_cte(cte)
.annotate(node=cte.col.node)
.order_by('node')
)
self.assertEqual(len(apples), 1)
apple = apples.get()
self.assertEqual(apple.node, 0)
self.assertEqual(apple.name, '国光')
発行されるSQLも同じです。
WITH RECURSIVE cte AS(
SELECT
"apple"."id",
"apple"."name",
"apple"."parent_id",
0 AS "node"
FROM
"apple"
WHERE
"apple"."id" = 6
UNION ALL
SELECT
"apple"."id",
"apple"."name",
"apple"."parent_id",
("cte"."node" + 1) AS "node"
FROM
"apple"
INNER JOIN
"cte"
ON "apple"."id" = "cte"."parent_id"
)
SELECT
"cte"."id",
"cte"."name",
"cte"."parent_id",
"cte"."node" AS "node"
FROM
"cte"
ORDER BY
"node" ASC
django-cteの戻り値をdictとして取得したい
WITH RECURSIVE ... AS() を関数化した時の関数の中で、 values() メソッドを使います。
def make_cte(cte):
shinano_gold = Apple.objects.get(name='シナノゴールド')
return Apple.objects.filter(
id=shinano_gold.pk
).values(
'id',
'parent',
'name',
node=Value(0, output_field=IntegerField()),
).union(
cte.join(Apple, id=cte.col.parent_id)
.values(
'id',
'parent',
'name',
node=cte.col.node + Value(1, output_field=IntegerField())),
all=True,
)
dictなのでテストコードが少し変わります。
self.assertEqual(len(apples), 3)
own = apples[0]
self.assertEqual(own['node'], 0)
self.assertEqual(own['name'], 'シナノゴールド')
own = apples[1]
self.assertEqual(own['node'], 1)
self.assertEqual(own['name'], '千秋')
own = apples[2]
self.assertEqual(own['node'], 2)
self.assertEqual(own['name'], '東光')
Githubに上げました。
https://github.com/thinkAmi-sandbox/django_cte-sample