Skip to content

telnet#

sample#

sample_telnetlib.py
import telnetlib

# 基本情報
host = "192.168.1.1"    
user = "cisco"
password = "cisco"
tn = telnetlib.Telnet(host)

# login 
tn.read_until(b"Username: ")
tn.write(user.encode("ascii") + b"\n")
tn.read_until(b"Password: ")
tn.write(password.encode("ascii") + b"\n")
tn.read_until(b"#")

tn.write(b'terminal length 0' + b'\n')
tn.read_until(b"#")
tn.write(b'terminal width 0' + b'\n')
tn.read_until(b"#")
tn.write(b"show run\n")
tn.read_until(b"#")
tn.write(b"exit\n")

output = tn.read_all().decode("ascii")
print(output)

mlutiprocess#

mp_telnet.py
import telnetlib
import multiprocessing as mp
import time


def run_device(device):
    # 基本情報
    host = "192.168.1.1"    
    user = "cisco"
    password = "cisco"
    tn = telnetlib.Telnet(host)

    # login 
    tn.read_until(b"Username: ")
    tn.write(user.encode("ascii") + b"\n")
    tn.read_until(b"Password: ")
    tn.write(password.encode("ascii") + b"\n")
    tn.read_until(b"#")

    tn.write(b'terminal length 0' + b'\n')
    tn.read_until(b"#")
    tn.write(b'terminal width 0' + b'\n')
    tn.read_until(b"#")
    tn.write(b"show run" + b'\n')
    tn.read_until(b"#")
    tn.write(b"exit\n")

    # output = tn.read_all().decode("ascii")
    output = tn.read_all().decode("utf-8")


if __name__ == '__main__':
    # format: hostname:ipaddress:username:password
    with open('devices.txt') as f:
        devices = f.readlines
        device_info = devices.split(":")
    # script starting time
    start = time.time()
    processes = []
    for device in device_info:
        device_info = device.split(":")
        processes.append(mp.Process(
            target=run_device, args=(device_info,)))

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    # script ending time
    end = time.time()
    print('Script execution time:', end - start)

上記の例では、get_config関数が各IPアドレスごとに並列に呼び出され、Telnet接続が確立されます。telnetlibを使用して、 認証情報を入力し、show running-configコマンドを実行して機器の設定を取得します。 設定は各機器ごとに別個のテキストファイルに保存されます。

multiprocessing.Poolを使用して、プロセスプールを作成し、map関数を使用して各IPアドレスに対してget_config関数をマップします。 各関数呼び出しは独立して実行されるため、並列処理により効率的に機器から設定を取得できます。

データベースに保存#

取得したCisco機器のコンフィグをデータベースに保存するには、Pythonの標準ライブラリであるsqlite3を使用することができます。 以下は、先ほどの例にSQLite3を組み込んだコードの例です。

import telnetlib
import multiprocessing
import sqlite3

def get_config(ip):
    tn = telnetlib.Telnet(ip)
    tn.read_until(b"Username: ")
    tn.write(b"<username>\n")
    tn.read_until(b"Password: ")
    tn.write(b"<password>\n")
    tn.write(b"enable\n")
    tn.read_until(b"Password: ")
    tn.write(b"<enable_password>\n")
    tn.write(b"terminal length 0\n")
    tn.write(b"show running-config\n")
    config = tn.read_until(b"#").decode("ascii")
    tn.write(b"exit\n")
    tn.close()
    with sqlite3.connect("configs.db") as conn:
        c = conn.cursor()
        c.execute("CREATE TABLE IF NOT EXISTS configs (id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT, config TEXT)")
        c.execute("INSERT INTO configs (ip, config) VALUES (?, ?)", (ip, config))
        conn.commit()

if __name__ == "__main__":
    ips = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
    with multiprocessing.Pool(processes=len(ips)) as pool:
        pool.map(get_config, ips)

上記の例では、SQLite3を使用してconfigs.dbという名前のデータベースを作成し、 各IPアドレスのコンフィグをconfigsテーブルに挿入します。with文を使用してデータベース接続を確立し、 cursor()メソッドを使用してカーソルを取得します。CREATE TABLE IF NOT EXISTSを使用して、 configsテーブルが存在しない場合は新しく作成されます。INSERT INTOを使用して、ipとconfigの値をテーブルに挿入します。 最後に、commit()メソッドを使用してトランザクションを確定します。

この例では、with文を使用してデータベース接続を開始しているため、エラーが発生した場合に自動的にロールバックされます。 また、プロセスプール内で並列処理を実行するため、コンフィグの挿入が同時に行われる可能性があります。しかし、SQLite3はトランザクションによる排他制御を行うため、 同時に実行されたトランザクションは自動的にロックされ、順番に実行されます。

コマンド結果を取得#

  • log フォルダに取得した結果が保存される
  • 「IOS-XR_command.txt」に取得したいコマンドを記述する
import csv
import multiprocessing
import telnetlib
import socket
import logging


"""TODO:
CSV ファイルに変更
コマンド分離 -> 複数コマンドへの対応
"""

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")


# TODO: コードの分離
def get_router_info(host_info):
    host = host_info['host']
    username = host_info['username']
    password = host_info['password']
    router_type = host_info['router_type']
    log_filename = f"./log/{host}_{router_type}-telnet.log"
    result = []

    try:
        if router_type == 'cisco':
            COMMAND_FILE = 'IOS-XR_command.txt'

            with telnetlib.Telnet(host, timeout=10) as tn, open(log_filename, 'w') as f:
                # tn.set_debuglevel(3) # debug 用
                logging.info(f"Connect to {host}")
                output = tn.read_until(b"Username:").decode('ascii').replace('\r\n','\n')
                result.append(output)
                tn.write(username.encode('ascii') + b"\n")
                output = tn.read_until(b"Password:").decode('ascii').replace('\r\n','\n')
                result.append(output)
                tn.write(password.encode('ascii') + b"\n")
                output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
                result.append(output)
                logging.info(f"Successfully login to {host}")
                tn.write(b"term len 0\n")
                output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
                result.append(output)

                # コマンドファイルからのコマンド入力
                with open(COMMAND_FILE, 'r',encoding='utf-8') as cf:
                    for cmd in cf:
                        cmd = cmd.strip()
                        tn.write(cmd.encode('ascii') + b"\n")
                        output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
                        result.append(output)

                # 結果の書き込み
                for i in result:
                    f.write(i)

        # TODO: JUNOS パターンの修正
        elif router_type == 'juniper':
            with telnetlib.Telnet(host, timeout=10) as tn, open(log_filename, 'w') as f:
                # tn.set_debuglevel(3) # debug 用
                logging.info(f"Connect to {host}")
                tn.read_until(b"login: ")
                tn.write(username.encode('ascii') + b"\n")
                tn.read_until(b"Password: ")
                tn.write(password.encode('ascii') + b"\n")
                tn.read_until(b">")
                logging.info(f"Successfully login to {host}")
                tn.write(b"show version\n")
                output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
                f.write(output)
                tn.write(b"exit\n")
        else:
            return logging.error(f"Error: Unknown router_type {router_type} for host {host}")

        return logging.info(f"Logout from {host}")

    except (socket.timeout, OSError) as e:
        return logging.error(f"Connection error {host}: {e}")


if __name__ == "__main__":

    processes = []
    # host_infos = [
    #     {"host": "192.168.126.132", "username": "cisco", "password": "cisco123","router_type": "cisco"},
    #     {"host": "192.168.126.133", "username": "cisco", "password": "cisco","router_type": "cisco"},
    #     {"host": "192.168.126.134", "username": "cisco", "password": "cisco","router_type": "cisco"},
    # ]
    with open('router_info.csv', 'r') as f:
        reader = csv.DictReader(f)
        host_infos = [row for row in reader]


    for host_info in host_infos:
        p = multiprocessing.Process(target=get_router_info, args=(host_info,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
router_info.csv
host,username,password,router_type
192.168.126.132,cisco,cisco123,cisco
192.168.126.133,cisco,cisco,cisco
192.168.126.134,cisco,cisco,cisco

取得した inventory を解析#

parse.py
import re
import csv


def parse_inventory(output):
    inventory = {}
    pattern = re.compile(r'^NAME:\s+"(?P<name>.*)",\s+DESCR:\s+"(?P<description>.*)"')
    for line in output.splitlines():
        match = pattern.match(line)
        if match:
            device = match.groupdict()
            pid_line = next(filter(lambda x: x.startswith('PID:'), output.splitlines()), None)
            if pid_line:
                device['pid'] = pid_line.split(':', 1)[1].strip()
            vid_line = next(filter(lambda x: x.startswith('VID:'), output.splitlines()), None)
            if vid_line:
                device['vid'] = vid_line.split(':', 1)[1].strip()
            sn_line = next(filter(lambda x: x.startswith('SN:'), output.splitlines()), None)
            if sn_line:
                device['sn'] = sn_line.split(':', 1)[1].strip()
            inventory[device['name']] = device
    return inventory


def write_to_csv(inventory_dict, output_file):
    with open(output_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        # CSVファイルのヘッダを書き込みます
        writer.writerow(['Name', 'Description', 'PID', 'VID', 'SN'])
        # 辞書の各要素をCSVファイルに書き込みます
        for device in inventory_dict.values():
            writer.writerow([device.get('name', ''),
                             device.get('description', ''),
                             device.get('pid', ''),
                             device.get('vid', ''),
                             device.get('sn', '')])


if __name__ == '__main__':
    filename = 'inventory.csv'

    with open(filename, 'w', encoding='utf-8') as f:
        f.write()

解説#

pid_line = next(filter(lambda x: x.startswith('PID:'), output.splitlines()), None)

このコードは、出力された各行から、"PID:"で始まる行を取得するために使用されています。

output.splitlines()は、出力文字列を改行コードで分割して、行のリストを作成します。

filter(lambda x: x.startswith('PID:'), output.splitlines())は、上記の行のリストから、"PID:"で始まる行だけを抽出するフィルター関数を定義します。

lambda x: x.startswith('PID:')は、引数xが"PID:"で始まるかどうかをチェックする無名関数を定義しています。

最後に、next()関数を使用して、フィルターされた行の中で最初に見つかった行を取得しています。もし見つからなかった場合は、Noneを返します。

つまり、このコードは、"PID:"で始まる行がある場合はその行を、ない場合はNoneを取得するために使われています。同様の方法で、"VID:"と"SN:"の情報を取得しています。

!!! note ”改行の扱い" Telnetプロトコルでは改行コードとして"\r\n"を使うため、Telnetセッション中にコマンドを送信する際、改行を表すために"\r\n"を使用する必要があります。 しかし、場合によっては、"\r\n"がコマンドの一部として解釈されてしまい、不正なコマンドとして扱われることがあります。

この問題を回避するには、以下の2つの方法があります。

改行を表す文字列を"\r\n"ではなく、"\n"のみに変更する。
例えば、以下のようにコマンドを送信する際に改行コードを"\n"に変更することができます。

```
tn.write(b"show interfaces ge-0/0/0 | match input\n")
```

改行を表す文字列を"\r\n"にするが、コマンド末尾に空白文字列を付け足す。
空白文字列は、"\r\n"の直後にスペースを追加することで表現することができます。例えば、以下のようにコマンドを送信することができます。


```py
tn.write(b"show interfaces ge-0/0/0 | match input \r\n")
```

JUNOS json 形式を解析#

このコードでは、telnetlibを使用してJunosデバイスに接続し、show interfacesコマンドのJSON出力を取得します。 次に、jsonモジュールを使用してJSON出力を解析し、各インタフェースの名前と速度を表示します。 最後に、telnetセッションを終了します。

import telnetlib
import re
import json

# Junosデバイスに接続する
tn = telnetlib.Telnet("192.0.2.1")
tn.read_until(b"login: ")
tn.write(b"username\n")
tn.read_until(b"Password: ")
tn.write(b"password\n")

# show interfacesのJSON出力を取得する
tn.write(b"show interfaces | display json\n")
output = tn.read_until(b"}", timeout=5)
json_output = json.loads(output + b"}")

# JSON出力を解析する
interfaces = json_output['interface-information'][0]['physical-interface']

for interface in interfaces:
    name = interface['name']
    speed = interface['speed']
    print(f'Interface {name} has speed {speed}bps')

# Junosデバイスから切断する
tn.write(b"exit\n")
tn.read_all()

参考サイト#