Scapy + Python2 + I/O多重化・ブロッキングI/O系モジュールにて、Raspberry Pi 2 Model Bをブリッジ化する

前回、Scapyの bridge_and_sniff() 関数を使って、Raspberry Pi 2 Model B をブリッジ化してみました。
Python2 + Scapyで、Raspberry Pi 2 Model B をブリッジにできるか試してみた #router_jisaku - メモ的な思考的な

ただ、 bridge_and_sniff() 関数を使うだけではお手軽すぎるため、今回は別の方法でブリッジ化してみます。

 
なお、誤った理解があれば、ご指摘いただけるとありがたいです。

 
目次

 

環境

前回の環境と同じですので、さらっと書いておきます。

  • Mac OS X 10.11.6
    • 疎通確認のために、PythonでHTTPサーバを立てる
  • Raspberry Pi 2 Model B
    • Raspbian
      • RASPBIAN STRETCH WITH DESKTOP
      • Version: November 2017
    • Python 2.7.13
      • Raspbianにインストールされていたものを使用
    • Scapy 2.3.3
    • 外付けUSB有線LANアダプタ(LUA4-U3-AGT)
    • SSHMacから接続可能
  • Windows10
    • curlをインストール済

 

bridge_and_sniff()関数の実装を見る

まずは、Scapyのbridge_and_sniff()関数の実装を見てみます。コードはこのあたりです。
https://github.com/secdev/scapy/blob/v2.3.3/scapy/sendrecv.py#L552

bridge_and_sniff()関数では、ネットワークというI/Oバウンドな処理を効果的に行えるよう、I/O多重化の select を使っていました。

処理の流れは

でした。

 

今回試してみる実装形式

I/Oバウンドを効果的に処理できそうなPythonの標準モジュールをみたところ、

  • I/O多重化: select
  • ブロッキングI/O系
    • マルチスレッド: threading
    • スレッドプール: concurrent.futures
  • ノンブロッキングI/O系
    • イベントループ(Python3.4〜): asyncio
    • Python3では後方互換維持のためだけに存在: asyncore

がありました。

Python2なScapyなこともあり、ノンブロッキングI/O系の実装は難しそうなため*1、今回はI/O多重化とブロッキングI/O系モジュールを使って実装してみます。

 
なお、I/Oバウンドな処理ではマルチプロセスやプロセスプールは効果的ではありません。
Pythonをとりまく並行/非同期の話

とはいえ、それらのコードも書いてみたかったので、

  • ブロッキングI/O系
    • マルチプロセス: multiprocess
    • プロセスプール: concurrent.futures

でも実装してみました。

 

I/O多重化:selectモジュールのselect()での実装

I/O多重化には

  • select
  • poll
  • epoll

などがあります。

 
まずは、Scapyのbridge_and_sniff()関数と同じく、selectモジュールの select() を試してみます。
16.1. select — I/O 処理の完了を待機する — Python 2.7.14 ドキュメント

scapy_bridge_select.py

# -*- coding: utf-8 -*-
from scapy.all import conf
import select


def bridge():
    try:
        # レイヤ2のソケットを用意
        eth0_socket = conf.L2socket(iface='eth0')
        eth1_socket = conf.L2socket(iface='eth1')
        # 別のインタフェースからパケットを送信するための辞書
        next_socket = {
            eth0_socket: eth1_socket,
            eth1_socket: eth0_socket,
        }
        while True:
            # select()関数で使えるようになるまで待機
            readable_sockets, _, _ = select.select([eth0_socket, eth1_socket], [], [])

            for s in readable_sockets:
                # 準備できたソケットから受信
                p = s.recv()

                if p:
                    # パケット全体をターミナルへ表示
                    p.show()

                    # 受信したパケットを別のインタフェースから送信
                    next_socket[s].send(p.original)

    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    bridge()

 
ためしにrecv()で受信した時の内容を見てみると、

# パケットの型をターミナルへ表示
print '---- packet type from recv(): {}'.format(type(p))
# => <class 'scapy.layers.l2.Ether'>
print '---- original type from recv(): {}'.format(type(p.original))
# => <type 'str'>

でした。

 

I/O多重化:selectモジュールのepoll()での実装

RaspbianはLinuxベースのOSなため、I/O多重化の epoll が使えます。

Pythonではselectモジュールの epoll() 関数を使って実装します。

scapy_bridge_epoll.py

# -*- coding: utf-8 -*-
from scapy.all import conf
import select


def bridge():
    try:
        # レイヤ2のソケットを用意
        eth0_socket = conf.L2socket(iface='eth0')
        eth1_socket = conf.L2socket(iface='eth1')

        epoll = select.epoll()
        # 読み出し可能なデータが存在する場合を登録
        # https://docs.python.jp/2/library/select.html#edge-and-level-trigger-polling-epoll-objects
        epoll.register(eth0_socket.fileno(), select.EPOLLIN)
        epoll.register(eth1_socket.fileno(), select.EPOLLIN)

        while True:
            # イベントを1秒待機
            events = epoll.poll(1)

            for fd, event in events:
                # ファイルディスクリプタがeth0のものと等しい場合
                if fd == eth0_socket.fileno():
                    p_eth0 = eth0_socket.recv()

                    if p_eth0:
                        # eth0で受信したパケットをeth1で送信
                        eth1_socket.send(p_eth0.original)

                # ファイルディスクリプタがeth1のものと等しい場合
                elif fd == eth1_socket.fileno():
                    p_eth1 = eth1_socket.recv()

                    if p_eth1:
                        # eth1で受信したパケットをeth0で送信
                        eth0_socket.send(p_eth1.original)

    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    bridge()

 
ためしに、epoll.poll()で取得したfdとeventの値を見ると、

print '--- fd type: {}'.format(type(fd))
# => fd type: <type 'int'>
print '--- event type: {}'.format(type(event))
# => event type: <type 'int'>

でした。

 

マルチスレッド:threadingモジュールでの実装

今度はマルチスレッドの threading モジュールを使ってみます。

 
threadingモジュールを使う場合、 Ctrl +C で終了できなくなりますが、今回はデーモンスレッドで動かすことで回避しました。

 
ただ、デーモンスレッドの場合、

注釈 デーモンスレッドは終了時にいきなり停止されます。デーモンスレッドで使われたリソース (開いているファイル、データベースのトランザクションなど) は適切に解放されないかもしれません。きちんと (gracefully) スレッドを停止したい場合は、スレッドを非デーモンスレッドにして、Event のような適切なシグナル送信機構を使用してください。

https://docs.python.jp/3/library/threading.html#thread-objects

に注意します。今回は関係なさそうなので、気にしないことにします。

scapy_bridge_threading.py

# -*- coding: utf-8 -*-
from scapy.all import conf
import threading


def bridge_from_eth0_to_eth1():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth0_socket.recv()
        if p:
            eth1_socket.send(p.original)


def bridge_from_eth1_to_eth0():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth1_socket.recv()
        if p:
            eth0_socket.send(p.original)


def bridge():
    try:
        # スレッドを用意
        bridge_eth0 = threading.Thread(target=bridge_from_eth0_to_eth1)
        bridge_eth1 = threading.Thread(target=bridge_from_eth1_to_eth0)

        # 今回はいきなり止まっても問題ないため、デーモンモードで動くようにする
        # https://docs.python.jp/3/library/threading.html#thread-objects
        bridge_eth0.daemon = True
        bridge_eth1.daemon = True

        # スレッドを開始
        bridge_eth0.start()
        bridge_eth1.start()

        # KeyboardInterruptを受け付けるよう、join()を秒指定で使う
        bridge_eth0.join(5)
        bridge_eth1.join(5)

    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    bridge()

 

マルチプロセス:multiprocessでの実装

次はマルチプロセスにて実装します。

 
繰り返しとなりますが、I/Oバウンドではマルチプロセス化しても効果的な処理になりません。今回は書いてみたかったという理由だけで実装しています。

scapy_bridge_multiprocess.py

# -*- coding: utf-8 -*-
from scapy.all import conf
import multiprocessing


def bridge_from_eth0_to_eth1():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth0_socket.recv()
        if p:
            eth1_socket.send(p.original)


def bridge_from_eth1_to_eth0():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth1_socket.recv()
        if p:
            eth0_socket.send(p.original)


def bridge():
    try:
        # プロセスを用意
        bridge_eth0 = multiprocessing.Process(target=bridge_from_eth0_to_eth1)
        bridge_eth1 = multiprocessing.Process(target=bridge_from_eth1_to_eth0)

        # プロセスを開始
        bridge_eth0.start()
        bridge_eth1.start()

    except KeyboardInterrupt:
        # threadingと異なり、特に何もしなくてもCtrl + C が可能
        pass


if __name__ == '__main__':
    bridge()

 

スレッドプール:concurrent.futures.ThreadPoolExecutorでの実装

Python3.2より登場した concurrent.futures モジュールの ThreadPoolExecutor を使ってスレッドプール版を実装します。
17.4. concurrent.futures – 並列タスク実行 — Python 3.6.3 ドキュメント

ただ、Python2では標準モジュールに入っていないため、バックポートされたモジュール futuresPyPIからインストールする必要があります。
https://pypi.python.org/pypi/futures

pi@raspberrypi:~/router_jisaku $ sudo pip install futures
Collecting futures
  Using cached futures-3.2.0-py2-none-any.whl
Installing collected packages: futures
Successfully installed futures-3.2.0

 
実装です。

scapy_bridge_ThreadPoolExecutor.py

# -*- coding: utf-8 -*-
from scapy.all import conf
import concurrent.futures


def bridge_from_eth0_to_eth1():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth0_socket.recv()
        if p:
            eth1_socket.send(p.original)


def bridge_from_eth1_to_eth0():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth1_socket.recv()
        if p:
            eth0_socket.send(p.original)


def bridge():
    try:
        # スレッドを用意
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
        executor.submit(bridge_from_eth0_to_eth1)
        executor.submit(bridge_from_eth1_to_eth0)

    except KeyboardInterrupt:
        # threadingと異なり、特に何もしなくてもCtrl + C が可能
        pass


if __name__ == '__main__':
    bridge()

 

プロセスプール:concurrent.futures.ProcessPoolExecutorでの実装

こちらもバックポートされたモジュール futures を使います。

ProcessPoolExecutorでの実装ですが、

  • I/Oバウンドでは効果的な処理にならない
  • PyPIにある注釈によると、クリティカルなところでは使うべきでないとのこと

より、マルチプロセス版同様、実装してみたかっただけです。

scapy_bridge_ProcessPoolExecutor.py

# -*- coding: utf-8 -*-
from scapy.all import conf
import concurrent.futures


def bridge_from_eth0_to_eth1():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth0_socket.recv()
        if p:
            eth1_socket.send(p.original)


def bridge_from_eth1_to_eth0():
    eth0_socket = conf.L2socket(iface='eth0')
    eth1_socket = conf.L2socket(iface='eth1')

    while True:
        p = eth1_socket.recv()
        if p:
            eth0_socket.send(p.original)


def bridge():
    try:
        # スレッドを用意
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
        executor.submit(bridge_from_eth0_to_eth1)
        executor.submit(bridge_from_eth1_to_eth0)

    except KeyboardInterrupt:
        # threadingと異なり、特に何もしなくてもCtrl + C が可能
        pass


if __name__ == '__main__':
    bridge()

 

参考資料

 

ソースコード

GitHubに上げました。ディレクトscapy_python2/bridge/ にある

  • scapy_bridge_select.py
  • scapy_bridge_epoll.py
  • scapy_bridge_threading.py
  • scapy_bridge_multiprocess.py
  • scapy_bridge_ThreadPoolExecutor.py
  • scapy_bridge_ProcessPoolExecutor.py

が今回のファイルです。
https://github.com/thinkAmi-sandbox/syakyo-router_jisaku

*1:正確には、 asyncioにはバックポートプロジェクトのTrolliusがあります。ただ、実装でハマったときに大変そうなので、今回実装するのはやめておきます。https://github.com/vstinner/trollius