前回、Scapyの bridge_and_sniff()
関数を使って、Raspberry Pi 2 Model B をブリッジ化してみました。
Python2 + Scapyで、Raspberry Pi 2 Model B をブリッジにできるか試してみた #router_jisaku - メモ的な思考的な
ただ、 bridge_and_sniff()
関数を使うだけではお手軽すぎるため、今回は別の方法でブリッジ化してみます。
なお、誤った理解があれば、ご指摘いただけるとありがたいです。
目次
- 環境
- bridge_and_sniff()関数の実装を見る
- 今回試してみる実装形式
- I/O多重化:selectモジュールのselect()での実装
- I/O多重化:selectモジュールのepoll()での実装
- マルチスレッド:threadingモジュールでの実装
- マルチプロセス:multiprocessでの実装
- スレッドプール:concurrent.futures.ThreadPoolExecutorでの実装
- プロセスプール:concurrent.futures.ProcessPoolExecutorでの実装
- 参考資料
- ソースコード
環境
前回の環境と同じですので、さらっと書いておきます。
- Mac OS X 10.11.6
- 疎通確認のために、PythonでHTTPサーバを立てる
- Raspberry Pi 2 Model B
- Windows10
- curlをインストール済
bridge_and_sniff()関数の実装を見る
まずは、Scapyのbridge_and_sniff()関数の実装を見てみます。コードはこのあたりです。
https://github.com/secdev/scapy/blob/v2.3.3/scapy/sendrecv.py#L552
bridge_and_sniff()関数では、ネットワークというI/Oバウンドな処理を効果的に行えるよう、I/O多重化の select
を使っていました。
処理の流れは
- 標準モジュール
select
のselect()
関数を使って、Socketを待ち受け p = s.recv()
で受信peerof[s].send(p.original)
で受信したパケットを別のインタフェースから送信
でした。
今回試してみる実装形式
I/Oバウンドを効果的に処理できそうなPythonの標準モジュールをみたところ、
- I/O多重化: select
- ブロッキングI/O系
- マルチスレッド: threading
- スレッドプール: concurrent.futures
- ノンブロッキングI/O系
- イベントループ(Python3.4〜): asyncio
- Python3では後方互換維持のためだけに存在: asyncore
がありました。
Python2なScapyなこともあり、ノンブロッキングI/O系の実装は難しそうなため*1、今回はI/O多重化とブロッキングI/O系モジュールを使って実装してみます。
なお、I/Oバウンドな処理ではマルチプロセスやプロセスプールは効果的ではありません。
Pythonをとりまく並行/非同期の話
とはいえ、それらのコードも書いてみたかったので、
- ブロッキングI/O系
- マルチプロセス: multiprocess
- プロセスプール: concurrent.futures
でも実装してみました。
I/O多重化:selectモジュールのselect()での実装
I/O多重化には
- select
- poll
- epoll
などがあります。
まずは、Scapyのbridge_and_sniff()関数と同じく、selectモジュールの select()
を試してみます。
16.1. select — I/O 処理の完了を待機する — Python 2.7.14 ドキュメント
scapy_bridge_select.py
# -*- coding: utf-8 -*- from scapy.all import conf import select def bridge(): try: # レイヤ2のソケットを用意 eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') # 別のインタフェースからパケットを送信するための辞書 next_socket = { eth0_socket: eth1_socket, eth1_socket: eth0_socket, } while True: # select()関数で使えるようになるまで待機 readable_sockets, _, _ = select.select([eth0_socket, eth1_socket], [], []) for s in readable_sockets: # 準備できたソケットから受信 p = s.recv() if p: # パケット全体をターミナルへ表示 p.show() # 受信したパケットを別のインタフェースから送信 next_socket[s].send(p.original) except KeyboardInterrupt: pass if __name__ == '__main__': bridge()
ためしにrecv()で受信した時の内容を見てみると、
# パケットの型をターミナルへ表示 print '---- packet type from recv(): {}'.format(type(p)) # => <class 'scapy.layers.l2.Ether'> print '---- original type from recv(): {}'.format(type(p.original)) # => <type 'str'>
でした。
I/O多重化:selectモジュールのepoll()での実装
RaspbianはLinuxベースのOSなため、I/O多重化の epoll
が使えます。
Pythonではselectモジュールの epoll()
関数を使って実装します。
scapy_bridge_epoll.py
# -*- coding: utf-8 -*- from scapy.all import conf import select def bridge(): try: # レイヤ2のソケットを用意 eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') epoll = select.epoll() # 読み出し可能なデータが存在する場合を登録 # https://docs.python.jp/2/library/select.html#edge-and-level-trigger-polling-epoll-objects epoll.register(eth0_socket.fileno(), select.EPOLLIN) epoll.register(eth1_socket.fileno(), select.EPOLLIN) while True: # イベントを1秒待機 events = epoll.poll(1) for fd, event in events: # ファイルディスクリプタがeth0のものと等しい場合 if fd == eth0_socket.fileno(): p_eth0 = eth0_socket.recv() if p_eth0: # eth0で受信したパケットをeth1で送信 eth1_socket.send(p_eth0.original) # ファイルディスクリプタがeth1のものと等しい場合 elif fd == eth1_socket.fileno(): p_eth1 = eth1_socket.recv() if p_eth1: # eth1で受信したパケットをeth0で送信 eth0_socket.send(p_eth1.original) except KeyboardInterrupt: pass if __name__ == '__main__': bridge()
ためしに、epoll.poll()で取得したfdとeventの値を見ると、
print '--- fd type: {}'.format(type(fd)) # => fd type: <type 'int'> print '--- event type: {}'.format(type(event)) # => event type: <type 'int'>
でした。
マルチスレッド:threadingモジュールでの実装
今度はマルチスレッドの threading
モジュールを使ってみます。
- 16.2. threading — 高水準のスレッドインタフェース — Python 2.7.14 ドキュメント
- threading – スレッドによる並列処理を管理する - Python Module of the Week
threadingモジュールを使う場合、 Ctrl +C
で終了できなくなりますが、今回はデーモンスレッドで動かすことで回避しました。
- python - threading ignores KeyboardInterrupt exception - Stack Overflow
- Ctrl+Cとkill -SIGINTの違いからLinuxプロセスグループを理解する | ギークを目指して
ただ、デーモンスレッドの場合、
注釈 デーモンスレッドは終了時にいきなり停止されます。デーモンスレッドで使われたリソース (開いているファイル、データベースのトランザクションなど) は適切に解放されないかもしれません。きちんと (gracefully) スレッドを停止したい場合は、スレッドを非デーモンスレッドにして、Event のような適切なシグナル送信機構を使用してください。
https://docs.python.jp/3/library/threading.html#thread-objects
に注意します。今回は関係なさそうなので、気にしないことにします。
scapy_bridge_threading.py
# -*- coding: utf-8 -*- from scapy.all import conf import threading def bridge_from_eth0_to_eth1(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth0_socket.recv() if p: eth1_socket.send(p.original) def bridge_from_eth1_to_eth0(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth1_socket.recv() if p: eth0_socket.send(p.original) def bridge(): try: # スレッドを用意 bridge_eth0 = threading.Thread(target=bridge_from_eth0_to_eth1) bridge_eth1 = threading.Thread(target=bridge_from_eth1_to_eth0) # 今回はいきなり止まっても問題ないため、デーモンモードで動くようにする # https://docs.python.jp/3/library/threading.html#thread-objects bridge_eth0.daemon = True bridge_eth1.daemon = True # スレッドを開始 bridge_eth0.start() bridge_eth1.start() # KeyboardInterruptを受け付けるよう、join()を秒指定で使う bridge_eth0.join(5) bridge_eth1.join(5) except KeyboardInterrupt: pass if __name__ == '__main__': bridge()
マルチプロセス:multiprocessでの実装
次はマルチプロセスにて実装します。
- 16.6. multiprocessing — プロセスベースの "並列処理" インタフェース — Python 2.7.14 ドキュメント
- multiprocessing の基本 - Python Module of the Week
繰り返しとなりますが、I/Oバウンドではマルチプロセス化しても効果的な処理になりません。今回は書いてみたかったという理由だけで実装しています。
scapy_bridge_multiprocess.py
# -*- coding: utf-8 -*- from scapy.all import conf import multiprocessing def bridge_from_eth0_to_eth1(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth0_socket.recv() if p: eth1_socket.send(p.original) def bridge_from_eth1_to_eth0(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth1_socket.recv() if p: eth0_socket.send(p.original) def bridge(): try: # プロセスを用意 bridge_eth0 = multiprocessing.Process(target=bridge_from_eth0_to_eth1) bridge_eth1 = multiprocessing.Process(target=bridge_from_eth1_to_eth0) # プロセスを開始 bridge_eth0.start() bridge_eth1.start() except KeyboardInterrupt: # threadingと異なり、特に何もしなくてもCtrl + C が可能 pass if __name__ == '__main__': bridge()
スレッドプール:concurrent.futures.ThreadPoolExecutorでの実装
Python3.2より登場した concurrent.futures
モジュールの ThreadPoolExecutor
を使ってスレッドプール版を実装します。
17.4. concurrent.futures – 並列タスク実行 — Python 3.6.3 ドキュメント
ただ、Python2では標準モジュールに入っていないため、バックポートされたモジュール futures
をPyPIからインストールする必要があります。
https://pypi.python.org/pypi/futures
pi@raspberrypi:~/router_jisaku $ sudo pip install futures Collecting futures Using cached futures-3.2.0-py2-none-any.whl Installing collected packages: futures Successfully installed futures-3.2.0
実装です。
scapy_bridge_ThreadPoolExecutor.py
# -*- coding: utf-8 -*- from scapy.all import conf import concurrent.futures def bridge_from_eth0_to_eth1(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth0_socket.recv() if p: eth1_socket.send(p.original) def bridge_from_eth1_to_eth0(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth1_socket.recv() if p: eth0_socket.send(p.original) def bridge(): try: # スレッドを用意 executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) executor.submit(bridge_from_eth0_to_eth1) executor.submit(bridge_from_eth1_to_eth0) except KeyboardInterrupt: # threadingと異なり、特に何もしなくてもCtrl + C が可能 pass if __name__ == '__main__': bridge()
プロセスプール:concurrent.futures.ProcessPoolExecutorでの実装
こちらもバックポートされたモジュール futures
を使います。
ProcessPoolExecutorでの実装ですが、
- I/Oバウンドでは効果的な処理にならない
- PyPIにある注釈によると、クリティカルなところでは使うべきでないとのこと
- The ProcessPoolExecutor class has known (unfixable) problems on Python 2 and should not be relied on for mission critical work.
より、マルチプロセス版同様、実装してみたかっただけです。
scapy_bridge_ProcessPoolExecutor.py
# -*- coding: utf-8 -*- from scapy.all import conf import concurrent.futures def bridge_from_eth0_to_eth1(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth0_socket.recv() if p: eth1_socket.send(p.original) def bridge_from_eth1_to_eth0(): eth0_socket = conf.L2socket(iface='eth0') eth1_socket = conf.L2socket(iface='eth1') while True: p = eth1_socket.recv() if p: eth0_socket.send(p.original) def bridge(): try: # スレッドを用意 executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) executor.submit(bridge_from_eth0_to_eth1) executor.submit(bridge_from_eth1_to_eth0) except KeyboardInterrupt: # threadingと異なり、特に何もしなくてもCtrl + C が可能 pass if __name__ == '__main__': bridge()
参考資料
- 非同期とノンブロッキングとあと何か | κeenのHappy Hacκing Blog
- Node.jsと非同期I/Oと混乱した私 - shutdown -r now
- ノンブロッキングI/Oと非同期I/Oの違いを理解する – PAYFORWARD
- 非同期I/O概説 - SlideShare
- Boost application performance using asynchronous I/O
- Concurrency and Parallelism: Understanding I/O | @RisingStack
- Pythonでネットワークプログラミング / fujimisakari blog
- Pythonをとりまく並行/非同期の話
- Python: ソケットプログラミングのアーキテクチャパターン - CUBE SUGAR CONTAINER
ソースコード
GitHubに上げました。ディレクトリ scapy_python2/bridge/
にある
- scapy_bridge_select.py
- scapy_bridge_epoll.py
- scapy_bridge_threading.py
- scapy_bridge_multiprocess.py
- scapy_bridge_ThreadPoolExecutor.py
- scapy_bridge_ProcessPoolExecutor.py
が今回のファイルです。
https://github.com/thinkAmi-sandbox/syakyo-router_jisaku
*1:正確には、 asyncioにはバックポートプロジェクトのTrolliusがあります。ただ、実装でハマったときに大変そうなので、今回実装するのはやめておきます。https://github.com/vstinner/trollius