Python + PuLP + ortoolpy による組合せ最適化を使って、行事の当番表を作ってみた

最近、行事の当番表を作る機会がありました。

行事の回数や当番対象の人数がそれなりだったこと、「今日の当番は何でこの組み合わせなの?」と質問された時に「プログラムが勝手にやりました」と答えたかったことから、プログラムを作って解決することにしました。

 
目次

環境

  • Python 3.9.1
  • ortoolpy 0.2.38
  • pandas 1.2.0
  • numpy 1.19.5
  • openpyxl 3.0.6
    • Excelへ出力する時に使用

 

ランダムな組み合わせ編

実装

当番をする人の組み合わせは自由とのことでした。

その他の条件を聞いたところ、以下でした。

  • 行事は18回
  • 一行事あたり3名の当番が必要
  • 一人1回当番をすれば良い

 
そこで、上記の条件を満たしつつ、「行事ごとにグループを作り、そのグループへランダムに人を割り当てる」仕様にて、Pythonスクリプトを書きました。

# shuffle.py
GROUP_COUNT = 18

def divide_users(users):
    current_index = 0
    
    # 当番の人のリストを要素として持つ、行事のリスト
    groups = [[] for _ in range(GROUP_COUNT)]

    random.shuffle(users)

    for user in users:
        groups[current_index].append(user)

        if current_index < GROUP_COUNT - 1:
            current_index += 1
        else:
            current_index = 0

    return groups

 

テスト

動作を確認するため、テストコードを用意します。

# test_shuffle.py
import unittest
from shuffle import divide_users

class TestShuffle(unittest.TestCase):
    def test_divide_users(self):
        users = [f'user{i}' for i in range(1, 54 + 1)]
        group = divide_users(users)

        user_set = set()
        for users in group:
            # 1当番3名か
            self.assertEqual(len(users), 3)

            # 1回のみ割り当てられているか
            for user in users:
                self.assertNotIn(user, user_set)
                user_set.add(user)


if __name__ == '__main__':
    unittest.main()

 
実行したところ、テストがパスしました。

% python test_shuffle.py
.
----------------------------------------------------------------------
Ran 1 test in 0.000s

OK

 
これで良さそうです。

 

組合せ最適化編

前置き

ひとまず作ってみたものの、現実には「この日はちょっと用事があるので、当番はできない」がありそうです。

そこで各個人で「行事ごとに、当番可/不可を記入」したリストを用意してもらう前提で、プログラムを改修することにしました。

 
当番表の作成はアルバイトのシフトを組む感じなので、組合せ最適化とかナーススケジューリングの方面でやり方を探したところ、参考となる記事がありました。

 
遺伝的アルゴリズムは面白そうだったのですが、今の自分だと yak shaving になりそうだったので、組合せ最適化で作ることにしました。

 

目的関数と制約条件

ナーススケジューリングの場合、一人が複数回シフトに入ります。一方、今回は一人1回のシフトです。

そこで、組合せ最適化でナーススケジューリング問題を解く - Qiita をベースにしつつ、目的関数と制約条件を差し替えることにしました。

 
ベースコードでは目的関数として複数挙げられています。ただ、今回は「必要人数差」だけそうです。これが最小になるものを求めます。

次に制約条件ですが、こちらはベースコードとはだいぶ異なります。

  • 1当番あたり3人
  • 一人あたり1回の当番
    • 複数回は不可
  • その人が当番可能な時に割り当てる
    • 無理に当番をお願いすることはできない

 
また、各個人からもらったリストを元に、当番可/不可のExcel表を作成します。その書式は以下のとおりです。

    • 行事の開催回数
    • 当番対象者
  • セル
    • 1 が当番可
    • 0 が当番

 
イメージはこんな感じです。

f:id:thinkAmi:20210202082108p:plain

 

実装

Excelを読んで当番のグループを作る関数を用意します。

 

全体像

個別の実装は後述するとして、まずは全体像です。

def divide_users():
    # デフォルトだと 54 x 54 で読まれるので、不要行は読み込まないようにする
    skip_rows = [i + GROUP_COUNT for i in range(1, USER_COUNT - GROUP_COUNT + 1)]
    df = pd.read_excel(FILE_NAME, header=0, index_col=0, skiprows=skip_rows)

    # 当番回数
    event_count = df.shape[0]
    # print(f'{type(box_size)}: {box_size}')
    # => <class 'int'>: 18

    # ユーザ数
    user_count = df.shape[1]
    # print(f'{type(user_size)}: {user_size}')
    # => <class 'int'>: 54

    # 数理モデル
    model = LpProblem()

    # 変数を準備(当番/非当番の2値なので、0-1変数リスト)
    # https://docs.pyq.jp/python/math_opt/pdopt.html
    var_schedule = np.array(addbinvars(event_count, user_count))
    df['必要人数差'] = addvars(event_count)

    # 重み
    weight = 1

    # 目的関数の割り当て
    model += lpSum(df.必要人数差) * weight

    # 制約
    # 1当番あたり3人
    for idx, row in df.iterrows():
        model += row.必要人数差 >= (lpSum(var_schedule[row.name]) - 3)
        model += row.必要人数差 >= -(lpSum(var_schedule[row.name]) - 3)

    # 一人あたり1回当番すればよい
    for user in range(user_count):
        scheduled = [var_schedule[event, user] for event in range(event_count)]
        model += lpSum(pd.Series(scheduled)) <= 1

    # 当番可能なイベントだけ割り当てる
    df_rev = df[df.columns].apply(lambda r: 1 - r[df.columns], 1)
    for (_, d), (_, s) in zip(df_rev.iterrows(), pd.DataFrame(var_schedule).iterrows()):
        model += lpDot(d, s) <= 0

    # 実行
    model.solve()

    # 結果取得
    vectorized_results = np.vectorize(value)(var_schedule).astype(int)
    # print(type(vectorized_results))
    # => <class 'numpy.ndarray'>

    group = [[] for _ in range(event_count)]
    for i, vectorized_result in enumerate(vectorized_results):
        for result, name in zip(vectorized_result, df.columns):
            if result * name:
                group[i].append(name)

    return group

 

個別に見る

まずはpandasでExcelからデータを読み込みます。

    skip_rows = [i + GROUP_COUNT for i in range(1, USER_COUNT - GROUP_COUNT + 1)]
    df = pd.read_excel(FILE_NAME, header=0, index_col=0, skiprows=skip_rows)

    # 当番回数
    event_count = df.shape[0]

    # ユーザ数
    user_count = df.shape[1]

 
数理モデルを作成します。

model = LpProblem()

 
次に、ortoolpyを使った変数を用意します。
データ分析と最適化 — Pythonオンライン学習サービス PyQ(パイキュー)ドキュメント

今回は当番/非当番の2値を持っていれば良いので、0-1変数リストとします。

# スケジュールされるもの
var_schedule = np.array(addbinvars(event_count, user_count))

# スケジュールした時の人数差
df['必要人数差'] = addvars(event_count)

 
次は目的関数です。重みを乗じた上で値が最小となるものを探します。

# 重み
weight = 1

# 目的関数の割り当て
model += lpSum(df.必要人数差) * weight

 
制約条件その1は「1当番あたり3人」です。こちらはベースと変わりません。

for idx, row in df.iterrows():
    model += row.必要人数差 >= (lpSum(var_schedule[row.name]) - 3)
    model += row.必要人数差 >= -(lpSum(var_schedule[row.name]) - 3)

 
制約条件その2は「一人あたりの当番は1回」です。

ユーザーごとにイベントで当番が割り当てられたかをサマリし、回数が1になっているものとします。

for user in range(user_count):
    scheduled = [var_schedule[event, user] for event in range(event_count)]
    model += lpSum(pd.Series(scheduled)) <= 1

 
制約条件その3は「当番可能なときだけ割り当てる」です。
Pythonでシフトを自動作成するアプリを作成、運用した話 | 機械学習、ウェブ開発、自動化の備忘録

df_rev = df[df.columns].apply(lambda r: 1 - r[df.columns], 1)
for (_, d), (_, s) in zip(df_rev.iterrows(), pd.DataFrame(var_schedule).iterrows()):
    model += lpDot(d, s) <= 0

 
あとは、実行した後、当番グループを作っています。

# 実行
model.solve()

# 結果取得
vectorized_results = np.vectorize(value)(var_schedule).astype(int)
# print(type(vectorized_results))
# => <class 'numpy.ndarray'>

group = [[] for _ in range(event_count)]
for i, vectorized_result in enumerate(vectorized_results):
    for result, name in zip(vectorized_result, df.columns):
        if result * name:
            group[i].append(name)

return group

 

テスト

今回もテストコードにて確認します。

 

テストデータ作成

テストをするために、適当な当番可/不可一覧のExcelファイルを生成します。

当番可能なのは、一人あたり全体の6割とします。

import copy
import random

import openpyxl

OK = [1] * 10  # 18回中6割がOK
NG = [0] * 8

FILE_NAME = 'users_for_optimization.xlsx'
GROUP_COUNT = 18
USER_COUNT = 54


def main():
    wb = openpyxl.Workbook()
    ws = wb.worksheets[0]

    # タイトル行
    name_cell = ws.cell(row=1, column=1)
    name_cell.value = '開催番号'

    for i in range(2, USER_COUNT + 2):
        name_cell = ws.cell(row=1, column=i)
        name_cell.value = f'ユーザ_{i - 1}'

    # 開催回数列
    for i in range(2, GROUP_COUNT + 2):
        group_cell = ws.cell(row=i, column=1)
        group_cell.value = i - 2

    # 54人分のデータ
    for i in range(2, USER_COUNT + 2):
        ok = copy.deepcopy(OK)
        ng = copy.deepcopy(NG)
        total = ok + ng
        random.shuffle(total)

        # 18回の情報を埋める
        row = 2
        while total:
            result = total.pop()
            cell = ws.cell(row=row, column=i)
            cell.value = result

            row += 1

    wb.save(FILE_NAME)


if __name__ == "__main__":
    main()

 

テストコード

関数 divide_users() の戻り値が条件を満たしているかを確認します。

import unittest

import openpyxl

from make_data import FILE_NAME, USER_COUNT, GROUP_COUNT
from optimization import divide_users


class TestOptimization(unittest.TestCase):
    def create_ok_list(self):
        ok_list = {i: [] for i in range(18)}

        wb = openpyxl.load_workbook(FILE_NAME)
        ws = wb.worksheets[0]

        for col_index in range(2, USER_COUNT + 2):
            user_name = ws.cell(row=1, column=col_index).value

            for row_index in range(2, GROUP_COUNT + 2):
                if ws.cell(row=row_index, column=col_index).value == 1:
                    ok_list[row_index - 2].append(user_name)

        return ok_list

    def test_divide_users(self):
        group = divide_users()

        user_set = set()
        for users in group:
            # 1当番3名か
            self.assertEqual(len(users), 3)

            # 1回のみ割り当てられているか
            for user in users:
                self.assertNotIn(user, user_set)
                user_set.add(user)

        # 自分の希望した場所のみか
        ok_list = self.create_ok_list()
        for i, users in enumerate(group):
            for user in users:
                self.assertIn(user, ok_list[i])


if __name__ == '__main__':
    unittest.main()

 
実行するとテストをパスするため、これで良さそうです。

% python test_optimization.py 
.
----------------------------------------------------------------------
Ran 1 test in 0.154s

OK

 

ソースコード

Githubに上げました。

https://github.com/thinkAmi/toban_kuji