Python + PuLP + ortoolpy による組合せ最適化を使って、行事の当番表を作ってみた

最近、行事の当番表を作る機会がありました。

行事の回数や当番対象の人数がそれなりだったこと、「今日の当番は何でこの組み合わせなの?」と質問された時に「プログラムが勝手にやりました」と答えたかったことから、プログラムを作って解決することにしました。

 
目次

環境

  • Python 3.9.1
  • ortoolpy 0.2.38
  • pandas 1.2.0
  • numpy 1.19.5
  • openpyxl 3.0.6
    • Excelへ出力する時に使用

 

ランダムな組み合わせ編

実装

当番をする人の組み合わせは自由とのことでした。

その他の条件を聞いたところ、以下でした。

  • 行事は18回
  • 一行事あたり3名の当番が必要
  • 一人1回当番をすれば良い

 
そこで、上記の条件を満たしつつ、「行事ごとにグループを作り、そのグループへランダムに人を割り当てる」仕様にて、Pythonスクリプトを書きました。

# shuffle.py
GROUP_COUNT = 18

def divide_users(users):
    current_index = 0
    
    # 当番の人のリストを要素として持つ、行事のリスト
    groups = [[] for _ in range(GROUP_COUNT)]

    random.shuffle(users)

    for user in users:
        groups[current_index].append(user)

        if current_index < GROUP_COUNT - 1:
            current_index += 1
        else:
            current_index = 0

    return groups

 

テスト

動作を確認するため、テストコードを用意します。

# test_shuffle.py
import unittest
from shuffle import divide_users

class TestShuffle(unittest.TestCase):
    def test_divide_users(self):
        users = [f'user{i}' for i in range(1, 54 + 1)]
        group = divide_users(users)

        user_set = set()
        for users in group:
            # 1当番3名か
            self.assertEqual(len(users), 3)

            # 1回のみ割り当てられているか
            for user in users:
                self.assertNotIn(user, user_set)
                user_set.add(user)


if __name__ == '__main__':
    unittest.main()

 
実行したところ、テストがパスしました。

% python test_shuffle.py
.
----------------------------------------------------------------------
Ran 1 test in 0.000s

OK

 
これで良さそうです。

 

組合せ最適化編

前置き

ひとまず作ってみたものの、現実には「この日はちょっと用事があるので、当番はできない」がありそうです。

そこで各個人で「行事ごとに、当番可/不可を記入」したリストを用意してもらう前提で、プログラムを改修することにしました。

 
当番表の作成はアルバイトのシフトを組む感じなので、組合せ最適化とかナーススケジューリングの方面でやり方を探したところ、参考となる記事がありました。

 
遺伝的アルゴリズムは面白そうだったのですが、今の自分だと yak shaving になりそうだったので、組合せ最適化で作ることにしました。

 

目的関数と制約条件

ナーススケジューリングの場合、一人が複数回シフトに入ります。一方、今回は一人1回のシフトです。

そこで、組合せ最適化でナーススケジューリング問題を解く - Qiita をベースにしつつ、目的関数と制約条件を差し替えることにしました。

 
ベースコードでは目的関数として複数挙げられています。ただ、今回は「必要人数差」だけそうです。これが最小になるものを求めます。

次に制約条件ですが、こちらはベースコードとはだいぶ異なります。

  • 1当番あたり3人
  • 一人あたり1回の当番
    • 複数回は不可
  • その人が当番可能な時に割り当てる
    • 無理に当番をお願いすることはできない

 
また、各個人からもらったリストを元に、当番可/不可のExcel表を作成します。その書式は以下のとおりです。

    • 行事の開催回数
    • 当番対象者
  • セル
    • 1 が当番可
    • 0 が当番

 
イメージはこんな感じです。

f:id:thinkAmi:20210202082108p:plain

 

実装

Excelを読んで当番のグループを作る関数を用意します。

 

全体像

個別の実装は後述するとして、まずは全体像です。

def divide_users():
    # デフォルトだと 54 x 54 で読まれるので、不要行は読み込まないようにする
    skip_rows = [i + GROUP_COUNT for i in range(1, USER_COUNT - GROUP_COUNT + 1)]
    df = pd.read_excel(FILE_NAME, header=0, index_col=0, skiprows=skip_rows)

    # 当番回数
    event_count = df.shape[0]
    # print(f'{type(box_size)}: {box_size}')
    # => <class 'int'>: 18

    # ユーザ数
    user_count = df.shape[1]
    # print(f'{type(user_size)}: {user_size}')
    # => <class 'int'>: 54

    # 数理モデル
    model = LpProblem()

    # 変数を準備(当番/非当番の2値なので、0-1変数リスト)
    # https://docs.pyq.jp/python/math_opt/pdopt.html
    var_schedule = np.array(addbinvars(event_count, user_count))
    df['必要人数差'] = addvars(event_count)

    # 重み
    weight = 1

    # 目的関数の割り当て
    model += lpSum(df.必要人数差) * weight

    # 制約
    # 1当番あたり3人
    for idx, row in df.iterrows():
        model += row.必要人数差 >= (lpSum(var_schedule[row.name]) - 3)
        model += row.必要人数差 >= -(lpSum(var_schedule[row.name]) - 3)

    # 一人あたり1回当番すればよい
    for user in range(user_count):
        scheduled = [var_schedule[event, user] for event in range(event_count)]
        model += lpSum(pd.Series(scheduled)) <= 1

    # 当番可能なイベントだけ割り当てる
    df_rev = df[df.columns].apply(lambda r: 1 - r[df.columns], 1)
    for (_, d), (_, s) in zip(df_rev.iterrows(), pd.DataFrame(var_schedule).iterrows()):
        model += lpDot(d, s) <= 0

    # 実行
    model.solve()

    # 結果取得
    vectorized_results = np.vectorize(value)(var_schedule).astype(int)
    # print(type(vectorized_results))
    # => <class 'numpy.ndarray'>

    group = [[] for _ in range(event_count)]
    for i, vectorized_result in enumerate(vectorized_results):
        for result, name in zip(vectorized_result, df.columns):
            if result * name:
                group[i].append(name)

    return group

 

個別に見る

まずはpandasでExcelからデータを読み込みます。

    skip_rows = [i + GROUP_COUNT for i in range(1, USER_COUNT - GROUP_COUNT + 1)]
    df = pd.read_excel(FILE_NAME, header=0, index_col=0, skiprows=skip_rows)

    # 当番回数
    event_count = df.shape[0]

    # ユーザ数
    user_count = df.shape[1]

 
数理モデルを作成します。

model = LpProblem()

 
次に、ortoolpyを使った変数を用意します。
データ分析と最適化 — Pythonオンライン学習サービス PyQ(パイキュー)ドキュメント

今回は当番/非当番の2値を持っていれば良いので、0-1変数リストとします。

# スケジュールされるもの
var_schedule = np.array(addbinvars(event_count, user_count))

# スケジュールした時の人数差
df['必要人数差'] = addvars(event_count)

 
次は目的関数です。重みを乗じた上で値が最小となるものを探します。

# 重み
weight = 1

# 目的関数の割り当て
model += lpSum(df.必要人数差) * weight

 
制約条件その1は「1当番あたり3人」です。こちらはベースと変わりません。

for idx, row in df.iterrows():
    model += row.必要人数差 >= (lpSum(var_schedule[row.name]) - 3)
    model += row.必要人数差 >= -(lpSum(var_schedule[row.name]) - 3)

 
制約条件その2は「一人あたりの当番は1回」です。

ユーザーごとにイベントで当番が割り当てられたかをサマリし、回数が1になっているものとします。

for user in range(user_count):
    scheduled = [var_schedule[event, user] for event in range(event_count)]
    model += lpSum(pd.Series(scheduled)) <= 1

 
制約条件その3は「当番可能なときだけ割り当てる」です。
Pythonでシフトを自動作成するアプリを作成、運用した話 | 機械学習、ウェブ開発、自動化の備忘録

df_rev = df[df.columns].apply(lambda r: 1 - r[df.columns], 1)
for (_, d), (_, s) in zip(df_rev.iterrows(), pd.DataFrame(var_schedule).iterrows()):
    model += lpDot(d, s) <= 0

 
あとは、実行した後、当番グループを作っています。

# 実行
model.solve()

# 結果取得
vectorized_results = np.vectorize(value)(var_schedule).astype(int)
# print(type(vectorized_results))
# => <class 'numpy.ndarray'>

group = [[] for _ in range(event_count)]
for i, vectorized_result in enumerate(vectorized_results):
    for result, name in zip(vectorized_result, df.columns):
        if result * name:
            group[i].append(name)

return group

 

テスト

今回もテストコードにて確認します。

 

テストデータ作成

テストをするために、適当な当番可/不可一覧のExcelファイルを生成します。

当番可能なのは、一人あたり全体の6割とします。

import copy
import random

import openpyxl

OK = [1] * 10  # 18回中6割がOK
NG = [0] * 8

FILE_NAME = 'users_for_optimization.xlsx'
GROUP_COUNT = 18
USER_COUNT = 54


def main():
    wb = openpyxl.Workbook()
    ws = wb.worksheets[0]

    # タイトル行
    name_cell = ws.cell(row=1, column=1)
    name_cell.value = '開催番号'

    for i in range(2, USER_COUNT + 2):
        name_cell = ws.cell(row=1, column=i)
        name_cell.value = f'ユーザ_{i - 1}'

    # 開催回数列
    for i in range(2, GROUP_COUNT + 2):
        group_cell = ws.cell(row=i, column=1)
        group_cell.value = i - 2

    # 54人分のデータ
    for i in range(2, USER_COUNT + 2):
        ok = copy.deepcopy(OK)
        ng = copy.deepcopy(NG)
        total = ok + ng
        random.shuffle(total)

        # 18回の情報を埋める
        row = 2
        while total:
            result = total.pop()
            cell = ws.cell(row=row, column=i)
            cell.value = result

            row += 1

    wb.save(FILE_NAME)


if __name__ == "__main__":
    main()

 

テストコード

関数 divide_users() の戻り値が条件を満たしているかを確認します。

import unittest

import openpyxl

from make_data import FILE_NAME, USER_COUNT, GROUP_COUNT
from optimization import divide_users


class TestOptimization(unittest.TestCase):
    def create_ok_list(self):
        ok_list = {i: [] for i in range(18)}

        wb = openpyxl.load_workbook(FILE_NAME)
        ws = wb.worksheets[0]

        for col_index in range(2, USER_COUNT + 2):
            user_name = ws.cell(row=1, column=col_index).value

            for row_index in range(2, GROUP_COUNT + 2):
                if ws.cell(row=row_index, column=col_index).value == 1:
                    ok_list[row_index - 2].append(user_name)

        return ok_list

    def test_divide_users(self):
        group = divide_users()

        user_set = set()
        for users in group:
            # 1当番3名か
            self.assertEqual(len(users), 3)

            # 1回のみ割り当てられているか
            for user in users:
                self.assertNotIn(user, user_set)
                user_set.add(user)

        # 自分の希望した場所のみか
        ok_list = self.create_ok_list()
        for i, users in enumerate(group):
            for user in users:
                self.assertIn(user, ok_list[i])


if __name__ == '__main__':
    unittest.main()

 
実行するとテストをパスするため、これで良さそうです。

% python test_optimization.py 
.
----------------------------------------------------------------------
Ran 1 test in 0.154s

OK

 

ソースコード

Githubに上げました。

https://github.com/thinkAmi/toban_kuji

2020年の振り返りと2021年の目標

例年通り、2020年の振り返りと2021年の目標っぽいものを書いてみます。

 

2020年の振り返り

2019年の振り返りと2020年の目標 - メモ的な思考的なで立てた目標を振り返ってみます。

 

色々な分野の素振り

2020年は公私ともにいろんな言語やフレームワークにさわりました。

深さはそれぞれですが、以下のような感じでした。

 

レーニングの復活

まずはウォーキングから、ということで2020年の新年から始めました。

ドラクエウォークをお供にウォーキングを続けたところ、360万歩程度歩けました。

f:id:thinkAmi:20210101203247p:plain

 
夏過ぎには1日49kmくらい歩いたり、ランニングもできるようになりました。

 

引き続き、何らかの試験を受ける

残ってたITストラテジスト試験を受けようと思いましたが、2021年春に延期されたため、結局何も受けず...
IPA 独立行政法人 情報処理推進機構:情報処理技術者試験:令和3年度春期試験の実施予定について

 

その他

COVID-19の影響もあり、2020年は全般的にパブリックな活動が減ってしまいました。

イベント

ほとんどオンラインでしたが通しでは参加できず、Blogにも残せませんでした。

オフラインの方が時間確保しやすいのを実感しました。

 

GitHub

2020年はアウトプットが足りてないですね...

f:id:thinkAmi:20201231123830p:plain

 

2021年の目標っぽいもの

新年から新天地ということもあり、今年も自分の中で完結できる

  • 新しい環境に溶け込む
  • レーニングの継続と筋トレの復活

を目標っぽいものにします。

 
というところで、今年もよろしくお願いします。

Raspberry Pi と python-cec で、HDMI CEC を経由してテレビの電源ON/OFFや音量調整を行う

先日Google Nest miniをお迎えしたので、家のテレビを操作してみようと思ったところ、手元のテレビでは直接の操作に対応していませんでした。

Google Nest mini以外でもテレビを操作する方法がないかを調べたところ、HDMI CECを使えばいけそうでした。

 
手元にある道具では、Raspberry PiとテレビをHDMIで接続し、libceccec-client を使えば良さそうでした。
Raspberry Pi のcec 制御とHDMIのオンオフ - それマグで!

 
せっかくなのでPythonでlibcecを直接扱う方法がないかを調べたところ python-cec がありました。READMEには「libcec bindings for Python」と書かれていました。
trainman419/python-cec

最新リリースは 2018/11/9 でしたが、Github上では今年もcommitされていたため、試してみることにしました。

なお、久しぶりにRaspberry Piをさわるため、セットアップするところからメモに残します。

 
目次

 

環境

開発はWindows上で、実行はRaspberry Piで行います。

Raspberry Pi

 

Windows 10

 

テレビ

 

ネットワーク構成図

Windows10 - Raspberry Pi間は有線LAN、Raspberry Pi - テレビ間はHDMIとします。

-----------------------------------------
Windows10
[IPアドレス:DHCP (192.168.0.xxx)]
-----------------------------------------
|
(LANケーブル)
|
-----------------------------------------
スイッチングハブ
-----------------------------------------
|
(LANケーブル)
|
-----------------------------------------
(`eth0` : オンボードLANアダプタ)
Raspberry Pi 2 Model B
[IPアドレス:固定 (192.168.0.50)]
-----------------------------------------
|
(HDMIケーブル)
|
-----------------------------------------
テレビ
-----------------------------------------

 

事前準備

Windows Terminalの準備

以下を参考に、Windows TerminalをMicrosoft storeからインストールします。また、自分がPowerShellに慣れていないため、デフォルトで cmd.exe が動くように切り替えます。
Windows Terminal Tips - Qiita

 

Raspberry Pi ImagerによるOS書き込み

最近は Raspberry Pi Imager を使ってmicroSDにOSを書き込むようになっていました。
「圧倒的に速い」──ラズパイにOSをインストールする新ツール「Raspberry Pi Imager」 (1/2) - ITmedia NEWS

今回はCLIでしかラズパイを使いませんが、ひとまず Raspbian を選んで書き込んでおきます。

 

Raspberry PiにてSSHを許可

以下を参考に、WindowsにOSの入ったmicroSDを接続し、Windows Terminalを使ってmicroSD上に ssh ファイルを置いておきます。

# microSDへ移動
>cd /d E:\

# 空のsshファイルを作成
E:\>cd . > ssh

# 確認
E:\>dir
 ドライブ E のボリューム ラベルは boot です
...
2020/12/31  09:08                 0 ssh

 
ちなみに、macの場合は以下のコマンドでsshファイルを作成します。

% touch /Volumes/boot/ssh

 

Raspberry Piの起動と接続確認

OSの入ったmicroSDRaspberry Piに接続し、電源を入れます。

その後、Windows TerminalからSSHで接続確認をします。

# パスワード認証によるSSH接続
>ssh pi@raspberrypi.local
pi@raspberrypi.local's password: <raspberry>

# Raspberry Piのバージョン確認
$ lsb_release -a
No LSB modules are available.
Distributor ID: Raspbian
Description:    Raspbian GNU/Linux 10 (buster)
Release:        10
Codename:       buster

 

Raspberry Piのログインを公開鍵認証に切り替え

パスワード認証から公開鍵認証に切り替えます。

 

Windows TerminalでSSH鍵の生成
# ssh-kegenで生成し、ログインユーザの .ssh フォルダにSSH鍵 pi_rsa を入れる
>ssh-keygen -t rsa -b 4096 -f %USERPROFILE%/.ssh/pi_rsa

# パスフレーズなし
Enter passphrase (empty for no passphrase):

 

SSH用公開鍵をRaspberry Piに登録

Raspberry Piに公開鍵でSSHするために、Windowsで作成した公開鍵を登録します。

しかし、Windowsには ssh-copy-id コマンドがありません。

代替案はいくつかあるようです。
Is there an equivalent to ssh-copy-id for Windows? - Server Fault

上記方法でも良いのですが、手元に何かないかなと思ったところ、Git bashがインストールされていることを思い出しました。

Git bashには ssh-copy-id コマンドがあったため、使ってみます。

$ ssh-copy-id -i ~/.ssh/pi_rsa.pub pi@raspberrypi.local
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
pi@raspberrypi.local's password:

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'pi@raspberrypi.local'"
and check to make sure that only the key(s) you wanted were added.

 
登録できたようなので、Windows Terminalから公開鍵方式でログインしてみます。

# 公開鍵認証によるSSH
>ssh -i %USERPROFILE%/.ssh/pi_rsa pi@raspberrypi.local

# Raspberry Piのバージョン確認
$ lsb_release -a
No LSB modules are available.
...

 

Raspberry Pivimを入れる

デフォルトでは vim-tiny なので、 vim に差し替えます。
RaspberryPi3のセットアップ続き〜VimやNFS設定 - Qiita

# アンインストール
$ sudo apt-get --purge remove vim-common vim-tiny

# vimをインストール
$ sudo apt-get install vim

 

Raspberry Piを固定IP化

以前行ったとおり、 /etc/dhcpcd.conf を修正し、固定IP化します。
Python2 + Scapyで、Raspberry Pi 2 Model B をブリッジにできるか試してみた #router_jisaku - メモ的な思考的な

# /etc/dhcpcd.conf を開く
$ vi /etc/dhcpcd.conf

# 末尾に追加
interface eth0
static ip_address=192.168.0.50/24
static routers=192.168.0.1
static domain_name_servers=192.168.0.1

 
追加した内容で有効化します。

$ sudo service dhcpcd reload

 
IPアドレスが変更となるので、Windows TerminalのSSH接続が切れます。そのため、再接続します。

> ssh -i %USERPROFILE%/.ssh/pi_rsa pi@raspberrypi.local

 

/boot/config.txtの編集

デフォルトでは、Raspberry PiHDMI接続したときにCEC信号が送られてしまうため、それを無効化しておきます。

$ vi /boot/config.txt

# 以下を追加
hdmi_ignore_cec_init=1

 

Raspberry PiのデフォルトのPythonをPython3にする

Raspberry PiのデフォルトのPythonのバージョンを見たところ、Python2系でした。

そのため、デフォルトをPython3系へと切り替えます。インストール済はPython3.7でしたが、今回扱う範囲では問題なかったので、Python3系の最新にはしません。
RaspberryPiでPythonのデフォルトをPython2.7からPython3に変更する | そう備忘録

# シンボリックリンクの確認
$ ls -l /usr/bin | grep python
...
lrwxrwxrwx 1 root root          7 Mar  4  2019 python -> python2
...
lrwxrwxrwx 1 root root          9 Mar 26  2019 python3 -> python3.7
...

# 変更
$ cd /usr/bin
pi@raspberrypi:/usr/bin $ sudo unlink python
pi@raspberrypi:/usr/bin $ sudo ln -s python3 python

# バージョン確認
$ python --version
Python 3.7.3

 

cec-clientのインストールと動作確認

まずは、Raspberry PiからHDMI CECを使った操作ができるかを確認します。

cec-clientは cec-utils に含まれるため、インストールします。

$ sudo apt-get update
$ sudo apt-get upgrade -y
$ sudo apt-get install cec-utils -y

 
cec-clientの動作確認をします。

$ sudo cec-client -l
libCEC version: 4.0.4, compiled on Linux-4.15.0-48-generic ... , features: P8_USB, DRM, P8_detect, randr, RPi, Exynos, AOCEC
Found devices: 1

device:              1
com port:            RPI
vendor id:           2708
product id:          1001
firmware version:    1
type:                Raspberry Pi

 
cec-clientを使った操作ですが、Raspberry Pitvservice をoffにしておかないと動作しません。

$ echo "scan" | cec-client -d 1 -s

# エラーが出て動かない
log level set to 1
opening a connection to the CEC adapter...
ERROR:   [             421]     RegisterLogicalAddress - CEC is being used by another application. Run "tvservice --off" and try again.
ERROR:   [             421]     Open - vc_cec could not be initialised
ERROR:   [             421]     could not open a connection (try 1)

 
そこで、 tvserviceをoffにします。
Raspberry Pi Documentation

$ tvservice -o
Powering off HDMI

 
再度実行すると、scanや電源ON/OFFができました。

# Scan
$ echo "scan" | cec-client -d 1 -s
log level set to 1
opening a connection to the CEC adapter...
requesting CEC bus information ...
CEC bus information
===================
device #0: TV
address:       0.0.0.0
active source: no
vendor:        Unknown
osd string:    TV
CEC version:   1.4
power status:  standby
language:      ???


device #1: Recorder 1
address:       1.0.0.0
active source: no
vendor:        Pulse Eight
osd string:    CECTester
CEC version:   1.4
power status:  on
language:      eng


currently active source: unknown (-1)

# 電源ON
$ echo 'on 0' | cec-client -s 
...
DEBUG:   [            1814]     >> TV (0) -> Recorder 1 (1): report power status (90)
DEBUG:   [            1814]     expected response received (90: report power status)
DEBUG:   [            1814]     << requesting vendor ID of 'TV' (0)
DEBUG:   [            1814]     'give device vendor id' is marked as unsupported feature for device 'TV'
NOTICE:  [            1814]     << powering on 'TV' (0)
TRAFFIC: [            1815]     << 10:04
DEBUG:   [            1906]     TV (0): power status changed from 'standby' to 'in transition from standby to on'

# 電源OFF (スタンバイ)
$ echo 'standby 0' | cec-client -s
...
TRAFFIC: [            2590]     >> 01:9f
DEBUG:   [            2591]     >> TV (0) -> Recorder 1 (1): get cec version (9F)
TRAFFIC: [            3351]     >> 0f:36
DEBUG:   [            3351]     TV (0): power status changed from 'on' to 'standby'
DEBUG:   [            3351]     >> TV (0) -> Broadcast (F): standby (36)

 

python-cecを使った操作

ここからが本題です。

今回はpython-cecを使い、Raspberry Piからテレビを操作します。
trainman419/python-cec

 

Windows上のPyCharmのPythonインタプリタRaspberry PiPythonにする

Raspberry Pi上で実装しても良いですが、せっかくので、ローカルのWindows上で実装したものをRaspberry Pi上で実行することにします。

なお、この方法はPyCharm Professionalが必要です。

 

Raspberry Pi上でvenv上にpython-cecを入れる
# ディレクトリを作り移動
pi@raspberrypi:~ $ mkdir projects
pi@raspberrypi:~ $ cd projects/
pi@raspberrypi:~/projects $ mkdir python_cec_sample
pi@raspberrypi:~/projects $ cd python_cec_sample

# venv環境を作る
$ python -m venv env
$ source env/bin/activate

# python-cecを入れる
$ pip install cec --no-cache-dir
Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Collecting cec
  Downloading https://www.piwheels.org/simple/cec/cec-0.2.7-cp37-cp37m-linux_armv7l.whl (146kB)
    100% |████████████████████████████████| 153kB 268kB/s
Installing collected packages: cec
Successfully installed cec-0.2.7

 

Windows上でPyCharmからプロジェクトを作成する

File > New Project... から新しいPythonプロジェクトを作成します。

設定は以下のようにします。

  • 左ペインで Pure Python を選択
  • 右側の Location に、ローカルに保存する場所(例:D:\projects\python_cec_sample) を指定
  • Python Interpreter欄にある、 Previously configured Interperter の右ボタンより、 Add Python Interpreter へ遷移
  • SSH Interpreter を選択
  • Existing server configuration の右ボタンより SSH configurations へ遷移
  • + を押して追加
  • 設定内容
    • Host: 上記で設定したRaspberry Piの固定IP (192.168.0.50)
    • Port: 22
    • Username: pi
    • Authentication type: Key pair
    • private key file: 上記で作ったprivateキー pi_rsa の場所
    • Passphrase: 空欄
  • Test connectionをクリック、接続できればOKとする
  • Connected to pi@192.168.0.50:22 のInterpriter指定は、Raspberry Piのvenv環境のPythonを指定 (/home/pi/projects/python_cec_sample/env/bin/python)
  • Execute code using this interpreter with root privileges via sudo にチェックを入れる
  • Remote project locationには、 /home/pi/projects/python_cec_sample を指定

 
上記により、PyCharm上で import ce と入力したときの補完が効くようになります。

もし補完が効かない場合は、以下を参考にリモートの再読み込みを行います。
【PyCharm】リモートインタプリタでライブラリ追加した際に正しく認識させる方法 | ゆとって生きたい。

  • Project Interpreterの歯車マークで Show All... を選択
  • ツリーマーク (Show paths for the selected interpreter) をクリック
  • リフレッシュマーク (Reload List of Paths) をクリック

 

Pythonスクリプトの作成

python-cecのREADMEを読むと、できることが一通り書かれています。

そのため、以下のようなPythonスクリプトを用意します。

このPythonスクリプトを実行すると、テレビの電源ON/OFFや音量調整ができたり、情報を出力できました。

import cec

def main():
    cec.init()

    tv = cec.Device(cec.CECDEVICE_TV)

    # 電源が入っているか
    print(tv.is_on())
    # => True / False


    # if tv.is_on():
    #     # 電源がONの場合、次はスタンバイにする
    #     tv.standby()
    # else:
    #     # 電源が入っていない場合、電源を入れる
    #     tv.power_on()

    # ベンダ
    print(tv.vendor)
    # => 000000

    # 言語
    print(tv.language)
    # => ??? (電源ONの場合は、jpn)

    print(tv.osd_string)
    # => TV

    print(tv.cec_version)
    # => 1.4

    # 音量周りは、一度にどちらかだけ
    # 音量を一段階上げる
    # cec.volume_up()
    # 音量を一段階下げる
    cec.volume_down()


if __name__ == '__main__':
    main()

 

ソースコード

Githubに上げました。
https://github.com/thinkAmi-sandbox/python_cec-sample

ダンボールに入れた本を管理するDjangoアプリ「danborary」を作った

家の本棚スペースが限られているため、年末の大掃除であまり読まなくなった本をダンボールに詰めようと考えました。

ただ、何も考えずに詰めると、どの箱に何の本があるか分からなくなります。

そこで、今年さわった技術を使って、ダンボールに入れた本を管理するDjangoアプリ danborary を作りました*1

 

目次  

 

環境

 

機能概要

  • ダンボールに管理用バーコードを貼り付けるため、印刷用バーコードラベルをpdf形式で作成する
  • 書籍のISBNを元に、国立国会図書館サーチの検索APIを使い、タイトルなどを取得する
  • ダンボールの管理用バーコードと書籍のISBNを紐付けて、SQLiteへ保存する
  • ダンボールに詰めた本は、jQuery Datatablesによりグリッドで表示する
  • できる限り、バーコードの読み取りだけで保存までできるUIにする

 

機能詳細

ラベル印刷

印刷用バーコードラベルで使うバーコード形式は、

  • 手元にあるバーコードスキャナが1次元しか対応していない
  • ダンボールの量はそんなに多くない

ことから、Code39としました。

 
Djangoアプリでpdfを作成する方法は以前やった方法を流用します。
Django + ReportLabをHerokuで動かしてpdfを表示する - メモ的な思考的な

印刷用ラベルは、A-oneの品番 28923 を使い、A4サイズに1片70mm×42.3mmのシールとして用意します。
[ 28923:ラベルシール[インクジェット] ] - 商品情報|ラベル・シールのエーワン

以下の記事を参考に、ラベルの中心あたりに印字するように調整します。
PDFをpythonで生成してみる

また、ラベルをなくしてしまっても再生成できるよう、URLで開始番号を指定します (http://localhost:8000/packing/print/start/<開始番号>/)。

できたものはこんな感じです。

f:id:thinkAmi:20201228104120j:plain

 

メニュー

印刷と箱詰めを切り替えられるよう、メインメニューを用意します。

f:id:thinkAmi:20201228103041j:plain

 

箱詰め画面

BootstrapとjQuery Datatablesを使った画面です。

f:id:thinkAmi:20201228103137j:plain

 
一番上の検索欄に書籍のISBNを読み込ませると、国立国会図書館の検索APIへリクエストし、書籍データを画面に反映します。

なお、検索APIを短期間で多数使うと問題があるかもしれないので、強制的に1秒くらい time.sleep させています。

また、同じISBNを読み込んだ場合は、検索APIを使わずにデータベースの内容を取得するようにしています。

f:id:thinkAmi:20201228103422j:plain

 
あとは、巻数を入力して登録します。

国立国会図書館の検索APIで巻数が取れなかったのですが、

  • 手元に1から始まるバーコードを用意しておけば、バーコードを読み込ませるだけで済む
  • 箱詰めする書籍はそんなに多くない

と考えて、自分で入力する形としました。

登録するとこんな感じになります。

f:id:thinkAmi:20201228103940j:plain

 

作らなかった機能

年末の大掃除前についカッとなって作ったDjangoアプリであり、以前からあたためていたものではありません。

そのため、「年末の大掃除」という絶対の納期を守らないといけないことから、いくつかの機能を省いています。

 

削除機能

DB自体を消せばいいし、ということで削除機能は用意しませんでした。

 

所有している本の管理機能

本を管理するには本自体にもバーコードを貼る必要がありそうでした。

ただ、今回は「ダンボールに入れた本を管理する」という目的だったので、工数がかかりそうなこの機能は不要と考えました。

 

技術的なところ

国立国会図書館の検索API用ライブラリについて

APIの仕様書を見ると、いくつか用意されているインタフェースのレスポンスは、いずれもXML形式のようでした。
API仕様の概要 « 国立国会図書館サーチについて

そこでラッパーライブラリがないかを探してみたところ、 pyndlsearch があったため、使うことにしました。
https://github.com/nocotan/pyndlsearch

 

DjangoアプリのView

今回はグリッド表示でjQuery Datatablesを使うことから、その部分のViewは django-datatables-view で実装しました。
https://bitbucket.org/pigletto/django-datatables-view/

一方、今後 jQueryから別のライブラリに移行するかもしれないことを考え、他の部分はDjango REST frameworkのViewで実装し、アプリも分けておきました。

 

ソースコード

Githubに上げました。
https://github.com/thinkAmi/danborary

*1:ダンボールとライブラリ(図書館)を組み合わせた名前にしました

Djangoとyamdlにより、fixtureを使わずにYAMLとモデルを紐付ける

これは JSL(日本システム技研) Advent Calendar 2020 - Qiita の12/21分の記事です。

ちょっとしたDjangoアプリを作る中で、

  • モデルのデータソースはYAMLにしたい
    • YAMLはマスタ的存在
    • モデル間のリレーションは存在しない
  • YAMLを書き換えた場合、Djangoアプリを再起動するだけで、データが反映されるようにしたい
    • migrateやfixtureのようなコマンドを使いたくない
  • YAMLに対して、QuerySetの抽出系メソッドを使いたい

ということがありました。

DBの代わりに、YAMLをモデルと紐付けられないかを調べましたが、標準ではそれらしいものがありませんでした。

そこでライブラリがないかを探したところ、 yamdl がありました。
andrewgodwin/yamdl: ORM-queryable YAML fixtures for Django

2017/8以降にコミットがないものの、試してみたところ希望通りの動作だったため、メモを残します。

 
目次

 

環境

 

モデルのデータソースとするYAML

りんごとその種子親・花粉親がまとまっているYAMLがあり、ここからデータをQuerySet経由でデータを抽出したいとします。

- name: 'シナノゴールド'
  seed: 'ゴールデンデリシャス'
  pollen: '千秋'

- name: 'フジ'
  seed: '国光'
  pollen: 'デリシャス'

- name: 'シナノゴールド'
  seed: 'ゴールデンデリシャス'
  pollen: '千秋'

- name: '秋映'
  seed: '千秋'
  pollen: 'ツガル'

- name: '王林'
  seed: 'ゴールデンデリシャス'
  pollen: '印度'

 

Djangoアプリの作成

Djangoプロジェクトの作成

いつも通り作成します。

$ django-admin startproject config .
$ python manage.py startapp myapp

 

モデル

上記のYAMLファイルに対応した項目を持つモデルを作成します。

yamdlのREADMEに従い、モデルの中に __yamdl__ = True と指定することで、YAMLをデータソースとしたモデルであると明示します。

また、いつロードされたのかを把握するため、 created_at で登録日時を保持しておきます。

from django.db import models
from django.utils import timezone


class Apple(models.Model):
    name = models.CharField('名前', max_length=20)
    seed = models.CharField('種子親', max_length=20)
    pollen = models.CharField('花粉親', max_length=20)

    created_at = models.DateTimeField(default=timezone.now)

    # yamdl用の設定を追加
    __yamdl__ = True

 

settings.py

いくつか追加します。

INSTALLED_APPS
INSTALLED_APPS = [
    # 自分のDjangoアプリ
    'myapp.apps.MyappConfig',
    ...
    # yamdl用
    'yamdl',
]

 

DATABASES

今回はSQLiteファイル自体を生成しないようにするため、defaultで設定してあるSQLiteをインメモリへと変更します。

また、yamdl用のエントリも追加します。

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': ':memory:',
    },
    'yamdl': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': 'file:yamdl-db?mode=memory&cache=shared',
    }
}

 

YAMDL_DIRECTORIES

データソースとなるYAMLファイルの置き場を指定します。

今回はBASE_DIRの下に content ディレクトリを指定し、その中にYAMLファイルを置くことにします。

YAMDL_DIRECTORIES = [
    BASE_DIR / 'content',
]

 

DATABASE_ROUTERS

こちらはREADMEそのままです。

DATABASE_ROUTERS = [
    "yamdl.router.YamdlRouter",
]

 

View

今回は、全件取得してJSONを返すようにします。

日本語をそのまま表示するために ensure_ascii を使います。

また、モデルの項目 created_at がDateTime型なので、シリアライズ可能にするために cls も指定しておきます。
https://docs.djangoproject.com/ja/3.1/topics/serialization/#serialization-formats-json

from django.core.serializers.json import DjangoJSONEncoder
from django.http import HttpResponse
from django.views import View
from myapp.models import Apple

class AppleView(View):
    def get(self, request, *args, **kwargs):
        apples = Apple.objects.all().values()

        data_json = json.dumps(list(apples), ensure_ascii=False, cls=DjangoJSONEncoder)
        return HttpResponse(data_json, content_type='application/json')

 

urls.py

よくある形です。

config/urls.py

urlpatterns = [
    path('', include('myapp.urls')),
]

 
myapp/urls.py

urlpatterns = [
    path('', AppleView.as_view()),
]

 

YAMLファイル

settings.pyの YAMDL_DIRECTORIES で指定した通り、 manage.py と同じ階層に content ディレクトリを作ります。

その下に、 <Djangoアプリ名>.<モデル名>ディレクトリを作成します。今回は myapp.Apple となります。

さらにその下に、データソースであるYAMLファイル (今回はapple.yaml) を置きます。

全体としてはこんな感じになります。

$ tree -I env
.
...
├── content
│   └── myapp.Apple
│       └── apple.yaml
├── manage.py
...

 

マイグレーションファイルの作成

今回はマイグレーションファイルのみ作成し、マイグレーションは行いません。

$ python manage.py makemigrations

 

ローカルでの動作確認

runserver後に localhost:8000 を確認すると、YAMLの内容がJSONレスポンスとして表示されました。

[
    {
        "id": 1,
        "name": "シナノゴールド",
        "seed": "ゴールデンデリシャス",
        "pollen": "千秋",
        "created_at": "2020-12-17T13:08:27.095Z"
    },
    {
        "id": 2,
        "name": "フジ",
        "seed": "国光",
        "pollen": "デリシャス",
        "created_at": "2020-12-17T13:08:27.098Z"
    },
    {
        "id": 3,
        "name": "シナノゴールド",
        "seed": "ゴールデンデリシャス",
        "pollen": "千秋",
        "created_at": "2020-12-17T13:08:27.098Z"
    },
    {
        "id": 4,
        "name": "秋映",
        "seed": "千秋",
        "pollen": "ツガル",
        "created_at": "2020-12-17T13:08:27.099Z"
    },
    {
        "id": 5,
        "name": "王林",
        "seed": "ゴールデンデリシャス",
        "pollen": "印度",
        "created_at": "2020-12-17T13:08:27.099Z"
    }
]

 

Herokuでの動作確認

ローカルで動作したものがHerokuでも動作するようであれば、読込専用データを表示するためだけにHeroku Postgresを使わなくても良さそうです。

そのため、Herokuでも動作確認をしてみました。

 
Herokuアプリとしてのセットアップは以下を参考に行いました。
DjangoアプリをHerokuにデプロイする方法 - Qiita

主な内容は以下の通りです。

  • ファイルの追加
    • .gitignore
    • requirements.txt
    • Procfile
    • runtime.txt
  • 設定変更
    • settings.pyで ALLOWED_HOSTS = ['*']
  • Herokuの設定変更
    • 今回はcollectstaticを使わないため、 heroku config:set DISABLE_COLLECTSTATIC=1 を実行

 

デプロイ後、時間をおいて確認してみたところ、 created_at 以外は同じデータが表示されました。想定通りでした。

初回

[
    {
        "id": 1,
        "name": "シナノゴールド",
        "seed": "ゴールデンデリシャス",
        "pollen": "千秋",
        "created_at": "2020-12-17T14:53:44.772Z"
    },
    {
        "id": 2,
        "name": "フジ",
        "seed": "国光",
        "pollen": "デリシャス",
        "created_at": "2020-12-17T14:53:44.812Z"
    },
    {
        "id": 3,
        "name": "シナノゴールド",
        "seed": "ゴールデンデリシャス",
        "pollen": "千秋",
        "created_at": "2020-12-17T14:53:44.812Z"
    },
    {
        "id": 4,
        "name": "秋映",
        "seed": "千秋",
        "pollen": "ツガル",
        "created_at": "2020-12-17T14:53:44.813Z"
    },
    {
        "id": 5,
        "name": "王林",
        "seed": "ゴールデンデリシャス",
        "pollen": "印度",
        "created_at": "2020-12-17T14:53:44.814Z"
    }
]

 

時間をおいた後

[
    {
        "id": 1,
        "name": "シナノゴールド",
        "seed": "ゴールデンデリシャス",
        "pollen": "千秋",
        "created_at": "2020-12-21T23:16:06.099Z"
    },
    {
        "id": 2,
        "name": "フジ",
        "seed": "国光",
        "pollen": "デリシャス",
        "created_at": "2020-12-21T23:16:06.123Z"
    },
    {
        "id": 3,
        "name": "シナノゴールド",
        "seed": "ゴールデンデリシャス",
        "pollen": "千秋",
        "created_at": "2020-12-21T23:16:06.124Z"
    },
    {
        "id": 4,
        "name": "秋映",
        "seed": "千秋",
        "pollen": "ツガル",
        "created_at": "2020-12-21T23:16:06.125Z"
    },
    {
        "id": 5,
        "name": "王林",
        "seed": "ゴールデンデリシャス",
        "pollen": "印度",
        "created_at": "2020-12-21T23:16:06.125Z"
    }
]

 

ソースコード

Githubに上げました。
https://github.com/thinkAmi-sandbox/django_yamdl-sample

django-cteと共通テーブル式(CTE)を用いた再帰クエリにより、階層構造を持つテーブルからデータを取得する

これは Django Advent Calendar 2020 - QiitaJSL(日本システム技研) Advent Calendar 2020 - Qiita の12/16分の記事です。

 
Django共通テーブル式(Common Table Expression、CTE)を用いた再帰クエリを使って、階層構造を持つテーブルからデータを取得したいことがありました。

ただ、現在のDjangoでは「共通テーブル式再帰クエリ」がサポートされていません。
#28919 (Add support for Common Table Expression (CTE) queries) – Django

SQLを書いても良いのですが、IDEのサポートがほしかったのでライブラリを探したところ、 django-cte がありました。
dimagi/django-cte: Common Table Expressions (CTE) for Django

そこで、django-cteと共通テーブル式を用いた再帰クエリを使った時のメモを残します。

 

目次

 

環境

 

そもそもやりたかったこと

リンゴの親子関係という階層構造を持つデータがあり、RDBに階層構造を保持したいとします*1*2

.
├── 東光
│   └── 千秋
│       ├── シナノゴールド
│       │   └── 奥州ロマン
│       └── 秋映
└── 国光
    └── フジ
        └── シナノスイート

 
また、この階層構造の途中のデータを取得すると、その祖先のデータもすべて取得したいとします。

例えば、「シナノゴールド」を指定すると、祖先の「千秋」「東光」も取得したいとします。

 

どうやって実現するか

階層構造をRDBに保持する方法としては、書籍

にていくつか言及があります。

 
ただ、今回やりたいことは比較的単純な階層構造であることに加え、SQLグラフ原論にて

RDB/SQLで階層構造を表現するメジャーな手段は、現在においても隣接リストモデルであるのは、動かしがたい事実

プログラマのためのSQLグラフ原論(初版第1刷) 付録 訳者による解説 (ミック) p311

と書かれていることから、隣接リストモデルにて表現します。

 
また、隣接リストからの取り出しについては、SQLアンチパターン

また、隣接リストに格納された階層構造をサポートするsQL拡張機能を備えているデータベース製品もあります。SQL-99標準では、WITHキーワードの後に共通テーブル式(Common Table Expression: CTE) を指定する形式の再帰クエリ構文を定義しています。

共通テーブル式を用いた再帰クエリは、Microsoft SQL Server 2005、Oracle Database 11g、IBM DB2MySQL 8.0、PostgreSQL 8.4、SQLite 3.8.3、Firebird 2.1 でサポートされています

SQLアンチパターン(初版第10刷) 2章ナイーブツリー(素朴な木) p19

とあります。

Djangoが公式サポートしているRDBは、いずれも上記に含まれています。
Databases | Django documentation | Django

そこで今回は、共通テーブル式を用いた再帰クエリにて実装することとします。

 

Djangoでの実装

モデル

RDBに保存するため、Djangoのモデルを定義します。

今回は

という構造とします。

なお、外部キー「親のサロゲートキー」では自己参照となりますが、Djangoでは ForeignKeyself を渡すことで可能になります。
https://docs.djangoproject.com/ja/3.1/ref/models/fields/#foreignkey

from django.db import models

class Apple(models.Model):
    name = models.CharField('名前', max_length=20)
    parent = models.ForeignKey('self',
                               on_delete=models.SET_NULL,
                               null=True,
                               blank=True)
    class Meta:
        db_table = 'apple'

 
このモデルの中身は以下を想定しています。

id name parent
1 東光 NULL
2 千秋 1
3 シナノゴールド 2
4 奥州ロマン 3
5 秋映 2
6 国光 NULL
7 フジ 6
8 シナノスイート 7

 
なお、親へさかのぼれないリンゴは、parentNULL を設定しています。

NULLを使ったのは、書籍「プログラマのためのSQLグラフ原論」のp24にも「最もよくある表現」と書かれていたためです。

もし他の値を設定したい場合は、同書の同ページにて言及されています。

 

共通テーブル式を用いた再帰クエリの書き方

モデルができたので、次は取得するクエリを作成します。

まずは、Djangoで生SQL版を実装する前に、共通テーブル式を用いた再帰クエリの書き方を見ていきます。

共通テーブル式はSQL99に含まれます。
新しい業界標準「SQL99」詳細解説

イメージ的にはこんな感じです。

WITH RECURSIVE <table> (<field>, ...)  /* 集めたデータを入れるテーブルとその項目 */
AS (
        /* 起点となるレコードを抽出する箇所 */
    UNION ALL
       /* 再帰してレコードを抽出する箇所 */
) 
SELECT * FROM <table>;  /* 集めたデータに対する処理 */

 

Djangoの生SQLで抽出

上記SQLイメージを元に、Djangoでの生SQL版を実装します。

 

集めたデータを入れるテーブルとその項目

共通テーブル名として、今回は tree としました。

また項目については、Appleモデルの idnameparent_id を用意します *3

それに加え、何階層さかのぼっているのかを確認するための項目 node を用意します*4

WITH RECURSIVE tree
    (node, id, name, parent_id)

 

起点となるレコードを抽出する箇所

起点となるレコードを特定するため、WHERE句を用意したSQLになります。

SELECT 0 AS node, base.id, base.name, base.parent_id
FROM apple AS base
WHERE base.id = %s

ここでは起点となるレコードなので、 node には 0 という固定値を設定します。

また、別の箇所で apple テーブルからの抽出を行うため、 AS で別名を付けておきます。

他に、WHERE句に %sプレースホルダーを用意します。
https://docs.djangoproject.com/ja/3.1/topics/db/sql/#passing-parameters-into-raw

 

再帰してレコードを抽出する箇所

起点と UNION ALL するSQLになります。

自身の id と共通テーブルの parent_id で INNER JOIN します。

SELECT tree.node + 1 AS node, 
       apple.id,
       apple.name,
       apple.parent_id
FROM apple
    INNER JOIN tree
        ON apple.id = tree.parent_id

 

集めたデータに対する処理

こちらの普通のSELECTです。

SELECT * 
FROM tree
ORDER BY node;

 

SQLの全体像

こんな感じになりました。

WITH RECURSIVE tree
    (node, id, name, parent_id)
AS (
        SELECT 0 AS node, base.id, base.name, base.parent_id
        FROM apple AS base
        WHERE base.id = %s
    UNION ALL
        SELECT tree.node + 1 AS node, 
               apple.id,
               apple.name,
               apple.parent_id
        FROM apple
            INNER JOIN tree
                ON apple.id = tree.parent_id
) SELECT * 
  FROM tree
  ORDER BY node;

 

動作確認

のちほど django-cte版も同じになるか確認するため、以下のようなassertするヘルパメソッドを用意します。

def assertCte(self, actual):
    # 件数
    self.assertEqual(len(actual), 3)

    # シナノゴールド自身があること
    own = actual[0]
    self.assertEqual(own.node, 0)
    self.assertEqual(own.name, 'シナノゴールド')

    # シナノゴールドの親(千秋)
    parent = actual[1]
    self.assertEqual(parent.node, 1)
    self.assertEqual(parent.name, '千秋')

    # 千秋の親(東光)
    grandparent = actual[2]
    self.assertEqual(grandparent.node, 2)
    self.assertEqual(grandparent.name, '東光')

 
その後、ヘルパメソッドを使ったテストコードを書いたところ、テストがパスしました。

class TestRecursive(TestCase):
    def test_1_raw_sql(self):
        shinano_gold = Apple.objects.get(name='シナノゴールド')

        apples = Apple.objects.raw(
            """
            WITH RECURSIVE tree
                (node, id, name, parent_id)
            AS (
                    SELECT 0 AS node, base.id, base.name, base.parent_id
                    FROM apple AS base
                    WHERE base.id = %s
                UNION ALL
                    SELECT tree.node + 1 AS node, 
                           apple.id,
                           apple.name,
                           apple.parent_id
                    FROM apple
                        INNER JOIN tree
                            ON apple.id = tree.parent_id
            ) SELECT * 
              FROM tree
              ORDER BY node;
            """
            , [shinano_gold.pk])

        self.assertCte(apples)

 

django-cteでの抽出

次に、django-cte での抽出を試します。

 

インストール

pipでインストールします。

pip install django-cte

 

モデルの objects を差し替え

次に、モデルの objectsCTEManager へ差し替えます。

from django_cte import CTEManager

class Apple(models.Model):
    ...
    objects = CTEManager()  # 追加

    class Meta:
        db_table = 'apple'

 

django-cteのクエリ全体像

READMEの Recursive Common Table Expressions と、生SQLの書き方を見比べると以下のようでした。

# WITH RECURSIVE ... AS() を関数化
def make_regions_cte(cte):
    return Region.objects.filter(
    ...
    ).union(
        ...
        all=True,
    )

# Withに割り当て
cte = With.recursive(make_regions_cte)

# 共通テーブルからの抽出
regions = (...)

 
そこで、コメントしたそれぞれの機能を実装していきます。

 

WITH RECURSIVE ... AS() を関数化

With.recursive() に渡す部分を関数化します。

recursive()関数のソースコードを読むと

:param make_cte_queryset: Function taking a single argument (a
not-yet-fully-constructed cte object) and returning a `QuerySet`
object. The returned `QuerySet` normally consists of an initial
statement unioned with a recursive statement.

https://github.com/dimagi/django-cte/blob/fede416338ec0c5a967e2f1f902435061ae630e1/django_cte/cte.py#L42

とあったため、QuerySetではなくQuerySetを返す関数を用意すれば良さそうです。

 
まずは起点となる部分の絞り込みを作ります。

shinano_gold = Apple.objects.get(name='シナノゴールド')

Apple.objects.filter(
    id=shinano_gold.pk
)

 
続いて、annotate() メソッドを使って、さかのぼり番号 node を追加します。
https://docs.djangoproject.com/ja/3.1/ref/models/querysets/#annotate

Apple.objects.filter(
    id=shinano_gold.pk
# 追加
).annotate(
    node=Value(0, output_field=IntegerField()),

 
最後に UNION ALL 後の部分を追加します。

Apple.objects.filter(
    id=shinano_gold.pk
).annotate(
    node=Value(0, output_field=IntegerField()),
# 追加
).union(
    cte.join(Apple, id=cte.col.parent_id)
       .annotate(node=cte.col.node + Value(1, output_field=IntegerField())),
    all=True,
)

あとはこのQuerySetを返せば関数が完成します。

 

With.recursiveに割り当て

関数を引数として渡すだけです。

cte = With.recursive(make_cte)

 

共通テーブルからの抽出

READMEの場合 With オブジェクトの join() メソッドを使っています。

しかし、今回の場合はWITH RECURSIVE ... AS() を関数化したところで UNION ALL しているため、これ以上のJOINは不要です。

そのため、Withオブジェクトの queryset() メソッドで、ここまで処理してきたQuerySetを取り出し、共通テーブルの処理へとつなげます。
https://github.com/dimagi/django-cte/blob/fede416338ec0c5a967e2f1f902435061ae630e1/django_cte/cte.py#L92

apples = (
    cte.queryset()
       .with_cte(cte)
       .annotate(node=cte.col.node)
    .order_by('node')
)

 

全体像

django-cte版の全体像はこんな感じです。

def make_cte(cte):
    shinano_gold = Apple.objects.get(name='シナノゴールド')

    return Apple.objects.filter(
        id=shinano_gold.pk
    ).annotate(
        node=Value(0, output_field=IntegerField()),
    ).union(
        cte.join(Apple, id=cte.col.parent_id)
           .annotate(node=cte.col.node + Value(1, output_field=IntegerField())),
        all=True,
    )

cte = With.recursive(make_cte)

apples = (
    cte.queryset()
       .with_cte(cte)
       .annotate(node=cte.col.node)
    .order_by('node')
)

 

動作確認

同じように、自作のヘルパメソッド self.assertCte(apples) を使ってテストコードで動作確認したところ、想定通りの動きとなりました。

また、発行されるSQLも想定通りでした。

WITH RECURSIVE cte AS(
    SELECT
        "apple"."id",
        "apple"."name",
        "apple"."parent_id",
        0 AS "node"
    FROM
        "apple"
    WHERE
        "apple"."id" = 3
    UNION ALL
    SELECT
        "apple"."id",
        "apple"."name",
        "apple"."parent_id",
        ("cte"."node" + 1) AS "node"
    FROM
        "apple"
        INNER JOIN
            "cte"
        ON  "apple"."id" = "cte"."parent_id"
)
SELECT
    "cte"."id",
    "cte"."name",
    "cte"."parent_id",
    "cte"."node" AS "node"
FROM
    "cte"
ORDER BY
    "node" ASC

 

その他

django-cteでルートのデータを取得する

今回の場合で言えば、「国光」のみを取得したいとなります。

とはいえ、書き方は上記の場合と変わらず、最初の Apple.objects.filter() の条件が異なるのみです。

以下のテストもパスします。

def test_3_django_cte_root(self):
    def make_cte(cte):
        kokko = Apple.objects.get(name='国光')

        return Apple.objects.filter(
            id=kokko.pk
        ).annotate(
            node=Value(0, output_field=IntegerField()),
        ).union(
            cte.join(Apple, id=cte.col.parent_id)
                .annotate(node=cte.col.node + Value(1, output_field=IntegerField())),
            all=True,
                )

    cte = With.recursive(make_cte)

    apples = (
        cte.queryset()
            .with_cte(cte)
            .annotate(node=cte.col.node)
            .order_by('node')
    )

    self.assertEqual(len(apples), 1)
    apple = apples.get()
    self.assertEqual(apple.node, 0)
    self.assertEqual(apple.name, '国光')

 
発行されるSQLも同じです。

WITH RECURSIVE cte AS(
    SELECT
        "apple"."id",
        "apple"."name",
        "apple"."parent_id",
        0 AS "node"
    FROM
        "apple"
    WHERE
        "apple"."id" = 6
    UNION ALL
    SELECT
        "apple"."id",
        "apple"."name",
        "apple"."parent_id",
        ("cte"."node" + 1) AS "node"
    FROM
        "apple"
        INNER JOIN
            "cte"
        ON  "apple"."id" = "cte"."parent_id"
)
SELECT
    "cte"."id",
    "cte"."name",
    "cte"."parent_id",
    "cte"."node" AS "node"
FROM
    "cte"
ORDER BY
    "node" ASC

 

django-cteの戻り値をdictとして取得したい

WITH RECURSIVE ... AS() を関数化した時の関数の中で、 values() メソッドを使います。

def make_cte(cte):
    shinano_gold = Apple.objects.get(name='シナノゴールド')

    return Apple.objects.filter(
        id=shinano_gold.pk
    # ここで values()
    ).values(
        'id',
        'parent',
        'name',
        node=Value(0, output_field=IntegerField()),
    ).union(
        cte.join(Apple, id=cte.col.parent_id)
            # こちらもvalues()
            .values(
                'id',
                'parent',
                'name',
                node=cte.col.node + Value(1, output_field=IntegerField())),
        all=True,
    )

 
dictなのでテストコードが少し変わります。

self.assertEqual(len(apples), 3)

# シナノゴールド自身があること
own = apples[0]
self.assertEqual(own['node'], 0)
self.assertEqual(own['name'], 'シナノゴールド')

# シナノゴールドの親(千秋)
own = apples[1]
self.assertEqual(own['node'], 1)
self.assertEqual(own['name'], '千秋')

# 千秋の親(東光)
own = apples[2]
self.assertEqual(own['node'], 2)
self.assertEqual(own['name'], '東光')

 

ソースコード

Githubに上げました。
https://github.com/thinkAmi-sandbox/django_cte-sample

*1:リンゴには「種子親」と「花粉親」がありますが、わかりやすくするため今回は種子親のみの階層構造とします。

*2:実際のところ、東光は「ゴールデンデリシャス x 印度」の交配で生まれていますが、わかりやすくするためそれ以上の親はさかのぼらないとします。参考: 東光 - 青森県の市販のりんごと話題のりんご

*3:SQLなので、モデルのフィールド名「parent」ではなく、実際のテーブル列名「parent_id」を指定します

*4:depthという名前の方が良いのかもしれませんが、RDBによっては使われる名前であることと、はてなブログシンタックスハイライトされてしまったため、「node」としました

Djangoで、SILENCED_SYSTEM_CHECKSを定義してSystem check frameworkのメッセージ出力を抑制する

これは JSL(日本システム技研) Advent Calendar 2020 - Qiita 12/15分の記事です。

 
DjangoにはSystem check frameworkがあり、Djangoプロジェクトの正しさをチェックしてくれます。
System check framework | Django ドキュメント | Django

そんな中、特定のチェックで大量に引っかかってしまうことがありました。

そこで、特定のチェックのメッセージ出力を抑える方法を探した時のメモを残します。

 
目次

 

環境

 

事例

例えば、Djangoを1系からバージョンアップする中で、urls.pyに

urlpatterns = [
    path('warn$', TemplateView.as_view(template_name='silence_app/index.html')),
]

と、 $ を残してしまったとします。

 
この場合、開発用のサーバを起動すると、

Performing system checks...

System check identified some issues:

WARNINGS:
?: (2_0.W001) Your URL pattern 'warn$' has a route that contains '(?P<', begins with a '^', or ends with a '$'. This was likely an oversight when migrating to django.urls.path().

System check identified 1 issue (0 silenced).

というメッセージが表示されます。

実際には他のメッセージも表示されているため、このメッセージだけを抑制したいとします。

 

対応

settings.pyに SILENCED_SYSTEM_CHECKS を定義します。
https://docs.djangoproject.com/en/3.1/ref/settings/#silenced-system-checks

今回は 2_0.W001 を抑制したいので、settings.pyに

SILENCED_SYSTEM_CHECKS = ['2_0.W001']

と定義します。

 
その後、開発サーバを起動すると

Performing system checks...

System check identified no issues (1 silenced).

へと表示が変わり、2_0.W001を抑制できました。

 
なお、この警告ですが、実際にアクセスしてみると

warn宛

$ curl localhost:8000/silence/warn -v
...
< HTTP/1.1 404 Not Found

warn$宛

$ curl localhost:8000/silence/warn$ -v
...
< HTTP/1.1 200 OK

となります。

 

ソースコード

Githubに上げました。 silence_app ディレクトリが今回のDjangoアプリです。
https://github.com/thinkAmi-sandbox/django_31-sample