telnet#
sample#
import telnetlib
# 基本情報
host = "192.168.1.1"
user = "cisco"
password = "cisco"
tn = telnetlib.Telnet(host)
# login
tn.read_until(b"Username: ")
tn.write(user.encode("ascii") + b"\n")
tn.read_until(b"Password: ")
tn.write(password.encode("ascii") + b"\n")
tn.read_until(b"#")
tn.write(b'terminal length 0' + b'\n')
tn.read_until(b"#")
tn.write(b'terminal width 0' + b'\n')
tn.read_until(b"#")
tn.write(b"show run\n")
tn.read_until(b"#")
tn.write(b"exit\n")
output = tn.read_all().decode("ascii")
print(output)
mlutiprocess#
import telnetlib
import multiprocessing as mp
import time
def run_device(device):
# 基本情報
host = "192.168.1.1"
user = "cisco"
password = "cisco"
tn = telnetlib.Telnet(host)
# login
tn.read_until(b"Username: ")
tn.write(user.encode("ascii") + b"\n")
tn.read_until(b"Password: ")
tn.write(password.encode("ascii") + b"\n")
tn.read_until(b"#")
tn.write(b'terminal length 0' + b'\n')
tn.read_until(b"#")
tn.write(b'terminal width 0' + b'\n')
tn.read_until(b"#")
tn.write(b"show run" + b'\n')
tn.read_until(b"#")
tn.write(b"exit\n")
# output = tn.read_all().decode("ascii")
output = tn.read_all().decode("utf-8")
if __name__ == '__main__':
# format: hostname:ipaddress:username:password
with open('devices.txt') as f:
devices = f.readlines
device_info = devices.split(":")
# script starting time
start = time.time()
processes = []
for device in device_info:
device_info = device.split(":")
processes.append(mp.Process(
target=run_device, args=(device_info,)))
for p in processes:
p.start()
for p in processes:
p.join()
# script ending time
end = time.time()
print('Script execution time:', end - start)
上記の例では、get_config関数が各IPアドレスごとに並列に呼び出され、Telnet接続が確立されます。telnetlibを使用して、 認証情報を入力し、show running-configコマンドを実行して機器の設定を取得します。 設定は各機器ごとに別個のテキストファイルに保存されます。
multiprocessing.Poolを使用して、プロセスプールを作成し、map関数を使用して各IPアドレスに対してget_config関数をマップします。 各関数呼び出しは独立して実行されるため、並列処理により効率的に機器から設定を取得できます。
データベースに保存#
取得したCisco機器のコンフィグをデータベースに保存するには、Pythonの標準ライブラリであるsqlite3を使用することができます。 以下は、先ほどの例にSQLite3を組み込んだコードの例です。
import telnetlib
import multiprocessing
import sqlite3
def get_config(ip):
tn = telnetlib.Telnet(ip)
tn.read_until(b"Username: ")
tn.write(b"<username>\n")
tn.read_until(b"Password: ")
tn.write(b"<password>\n")
tn.write(b"enable\n")
tn.read_until(b"Password: ")
tn.write(b"<enable_password>\n")
tn.write(b"terminal length 0\n")
tn.write(b"show running-config\n")
config = tn.read_until(b"#").decode("ascii")
tn.write(b"exit\n")
tn.close()
with sqlite3.connect("configs.db") as conn:
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS configs (id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT, config TEXT)")
c.execute("INSERT INTO configs (ip, config) VALUES (?, ?)", (ip, config))
conn.commit()
if __name__ == "__main__":
ips = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
with multiprocessing.Pool(processes=len(ips)) as pool:
pool.map(get_config, ips)
上記の例では、SQLite3を使用してconfigs.dbという名前のデータベースを作成し、 各IPアドレスのコンフィグをconfigsテーブルに挿入します。with文を使用してデータベース接続を確立し、 cursor()メソッドを使用してカーソルを取得します。CREATE TABLE IF NOT EXISTSを使用して、 configsテーブルが存在しない場合は新しく作成されます。INSERT INTOを使用して、ipとconfigの値をテーブルに挿入します。 最後に、commit()メソッドを使用してトランザクションを確定します。
この例では、with文を使用してデータベース接続を開始しているため、エラーが発生した場合に自動的にロールバックされます。 また、プロセスプール内で並列処理を実行するため、コンフィグの挿入が同時に行われる可能性があります。しかし、SQLite3はトランザクションによる排他制御を行うため、 同時に実行されたトランザクションは自動的にロックされ、順番に実行されます。
コマンド結果を取得#
- log フォルダに取得した結果が保存される
- 「IOS-XR_command.txt」に取得したいコマンドを記述する
import csv
import multiprocessing
import telnetlib
import socket
import logging
"""TODO:
CSV ファイルに変更
コマンド分離 -> 複数コマンドへの対応
"""
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# TODO: コードの分離
def get_router_info(host_info):
host = host_info['host']
username = host_info['username']
password = host_info['password']
router_type = host_info['router_type']
log_filename = f"./log/{host}_{router_type}-telnet.log"
result = []
try:
if router_type == 'cisco':
COMMAND_FILE = 'IOS-XR_command.txt'
with telnetlib.Telnet(host, timeout=10) as tn, open(log_filename, 'w') as f:
# tn.set_debuglevel(3) # debug 用
logging.info(f"Connect to {host}")
output = tn.read_until(b"Username:").decode('ascii').replace('\r\n','\n')
result.append(output)
tn.write(username.encode('ascii') + b"\n")
output = tn.read_until(b"Password:").decode('ascii').replace('\r\n','\n')
result.append(output)
tn.write(password.encode('ascii') + b"\n")
output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
result.append(output)
logging.info(f"Successfully login to {host}")
tn.write(b"term len 0\n")
output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
result.append(output)
# コマンドファイルからのコマンド入力
with open(COMMAND_FILE, 'r',encoding='utf-8') as cf:
for cmd in cf:
cmd = cmd.strip()
tn.write(cmd.encode('ascii') + b"\n")
output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
result.append(output)
# 結果の書き込み
for i in result:
f.write(i)
# TODO: JUNOS パターンの修正
elif router_type == 'juniper':
with telnetlib.Telnet(host, timeout=10) as tn, open(log_filename, 'w') as f:
# tn.set_debuglevel(3) # debug 用
logging.info(f"Connect to {host}")
tn.read_until(b"login: ")
tn.write(username.encode('ascii') + b"\n")
tn.read_until(b"Password: ")
tn.write(password.encode('ascii') + b"\n")
tn.read_until(b">")
logging.info(f"Successfully login to {host}")
tn.write(b"show version\n")
output = tn.read_until(b"#").decode('ascii').replace('\r\n','\n')
f.write(output)
tn.write(b"exit\n")
else:
return logging.error(f"Error: Unknown router_type {router_type} for host {host}")
return logging.info(f"Logout from {host}")
except (socket.timeout, OSError) as e:
return logging.error(f"Connection error {host}: {e}")
if __name__ == "__main__":
processes = []
# host_infos = [
# {"host": "192.168.126.132", "username": "cisco", "password": "cisco123","router_type": "cisco"},
# {"host": "192.168.126.133", "username": "cisco", "password": "cisco","router_type": "cisco"},
# {"host": "192.168.126.134", "username": "cisco", "password": "cisco","router_type": "cisco"},
# ]
with open('router_info.csv', 'r') as f:
reader = csv.DictReader(f)
host_infos = [row for row in reader]
for host_info in host_infos:
p = multiprocessing.Process(target=get_router_info, args=(host_info,))
processes.append(p)
p.start()
for p in processes:
p.join()
host,username,password,router_type
192.168.126.132,cisco,cisco123,cisco
192.168.126.133,cisco,cisco,cisco
192.168.126.134,cisco,cisco,cisco
取得した inventory を解析#
import re
import csv
def parse_inventory(output):
inventory = {}
pattern = re.compile(r'^NAME:\s+"(?P<name>.*)",\s+DESCR:\s+"(?P<description>.*)"')
for line in output.splitlines():
match = pattern.match(line)
if match:
device = match.groupdict()
pid_line = next(filter(lambda x: x.startswith('PID:'), output.splitlines()), None)
if pid_line:
device['pid'] = pid_line.split(':', 1)[1].strip()
vid_line = next(filter(lambda x: x.startswith('VID:'), output.splitlines()), None)
if vid_line:
device['vid'] = vid_line.split(':', 1)[1].strip()
sn_line = next(filter(lambda x: x.startswith('SN:'), output.splitlines()), None)
if sn_line:
device['sn'] = sn_line.split(':', 1)[1].strip()
inventory[device['name']] = device
return inventory
def write_to_csv(inventory_dict, output_file):
with open(output_file, mode='w', newline='') as file:
writer = csv.writer(file)
# CSVファイルのヘッダを書き込みます
writer.writerow(['Name', 'Description', 'PID', 'VID', 'SN'])
# 辞書の各要素をCSVファイルに書き込みます
for device in inventory_dict.values():
writer.writerow([device.get('name', ''),
device.get('description', ''),
device.get('pid', ''),
device.get('vid', ''),
device.get('sn', '')])
if __name__ == '__main__':
filename = 'inventory.csv'
with open(filename, 'w', encoding='utf-8') as f:
f.write()
解説#
pid_line = next(filter(lambda x: x.startswith('PID:'), output.splitlines()), None)
このコードは、出力された各行から、"PID:"で始まる行を取得するために使用されています。
output.splitlines()は、出力文字列を改行コードで分割して、行のリストを作成します。
filter(lambda x: x.startswith('PID:'), output.splitlines())は、上記の行のリストから、"PID:"で始まる行だけを抽出するフィルター関数を定義します。
lambda x: x.startswith('PID:')は、引数xが"PID:"で始まるかどうかをチェックする無名関数を定義しています。
最後に、next()関数を使用して、フィルターされた行の中で最初に見つかった行を取得しています。もし見つからなかった場合は、Noneを返します。
つまり、このコードは、"PID:"で始まる行がある場合はその行を、ない場合はNoneを取得するために使われています。同様の方法で、"VID:"と"SN:"の情報を取得しています。
!!! note ”改行の扱い" Telnetプロトコルでは改行コードとして"\r\n"を使うため、Telnetセッション中にコマンドを送信する際、改行を表すために"\r\n"を使用する必要があります。 しかし、場合によっては、"\r\n"がコマンドの一部として解釈されてしまい、不正なコマンドとして扱われることがあります。
この問題を回避するには、以下の2つの方法があります。
改行を表す文字列を"\r\n"ではなく、"\n"のみに変更する。
例えば、以下のようにコマンドを送信する際に改行コードを"\n"に変更することができます。
```
tn.write(b"show interfaces ge-0/0/0 | match input\n")
```
改行を表す文字列を"\r\n"にするが、コマンド末尾に空白文字列を付け足す。
空白文字列は、"\r\n"の直後にスペースを追加することで表現することができます。例えば、以下のようにコマンドを送信することができます。
```py
tn.write(b"show interfaces ge-0/0/0 | match input \r\n")
```
JUNOS json 形式を解析#
このコードでは、telnetlibを使用してJunosデバイスに接続し、show interfacesコマンドのJSON出力を取得します。 次に、jsonモジュールを使用してJSON出力を解析し、各インタフェースの名前と速度を表示します。 最後に、telnetセッションを終了します。
import telnetlib
import re
import json
# Junosデバイスに接続する
tn = telnetlib.Telnet("192.0.2.1")
tn.read_until(b"login: ")
tn.write(b"username\n")
tn.read_until(b"Password: ")
tn.write(b"password\n")
# show interfacesのJSON出力を取得する
tn.write(b"show interfaces | display json\n")
output = tn.read_until(b"}", timeout=5)
json_output = json.loads(output + b"}")
# JSON出力を解析する
interfaces = json_output['interface-information'][0]['physical-interface']
for interface in interfaces:
name = interface['name']
speed = interface['speed']
print(f'Interface {name} has speed {speed}bps')
# Junosデバイスから切断する
tn.write(b"exit\n")
tn.read_all()