微恐连续剧
67.38M · 2026-02-05
大家好,我是V哥。今天跟兄弟们聊聊Xshell的插件开发,教你怎么用Python把Xshell改造成你专属的运维神器。
说实话,Xshell这玩意儿用了这么多年,很多兄弟还停留在手动敲命令的阶段。其实它支持脚本扩展的,玩好了能省你一大半时间。今天V哥就把压箱底的货都掏出来,跟你好好唠唠。
很多兄弟不知道,Xshell其实支持三种脚本:VBScript、JScript和Python。咱们今天主攻Python,毕竟这玩意儿最顺手。
Xshell的脚本主要通过两种方式工作:
第一种是内置脚本引擎,直接在Xshell里面跑脚本,能调用Xshell提供的API。
第二种是外部程序配合,用Python写个独立程序,通过各种方式跟Xshell或者远程服务器交互。
咱们两种都讲,你根据实际需求选择。
先说Xshell自带的脚本功能,这个很多人不知道。
打开Xshell,点菜单栏的"工具" -> "脚本" -> "运行",就能执行脚本了。
来看看Xshell的Python脚本怎么写:
# hello_xshell.py
# 这是最简单的Xshell脚本
def Main():
# xsh是Xshell提供的全局对象
xsh.Session.Sleep(1000) # 等待1秒
# 向终端发送命令
xsh.Screen.Send("echo 'Hello from V哥的脚本'n")
# 等待命令执行完
xsh.Session.Sleep(500)
# 获取屏幕上的文本
result = xsh.Screen.Get(1, 1, xsh.Screen.CurrentRow, 80)
# 弹窗显示
xsh.Dialog.MsgBox("脚本执行完成!")
Main()
V哥给你整理一下Xshell提供的主要API对象:
"""
Xshell Python脚本 API 速查手册 - V哥整理
"""
def xshell_api_demo():
"""
xsh对象是Xshell自动注入的全局对象
包含以下主要子对象:
"""
# ========== Session对象 - 会话控制 ==========
xsh.Session.Open("ssh://user@host:22") # 打开新会话
xsh.Session.Close() # 关闭当前会话
xsh.Session.Sleep(1000) # 暂停毫秒数
xsh.Session.Connected # 是否已连接(只读)
xsh.Session.LocalAddress # 本地地址
xsh.Session.RemoteAddress # 远程地址
xsh.Session.Path # 会话文件路径
# ========== Screen对象 - 屏幕交互 ==========
xsh.Screen.Send("commandn") # 发送字符串到终端
xsh.Screen.Clear() # 清屏
xsh.Screen.CurrentRow # 当前行号
xsh.Screen.CurrentColumn # 当前列号
xsh.Screen.Columns # 屏幕列数
xsh.Screen.Rows # 屏幕行数
# 获取屏幕文本,参数是起始行、起始列、结束行、结束列
text = xsh.Screen.Get(1, 1, 24, 80)
# 等待特定字符串出现,超时秒数
xsh.Screen.WaitForString("$", 10)
# 同步执行,发送命令并等待提示符
xsh.Screen.Synchronous = True
# ========== Dialog对象 - 对话框 ==========
xsh.Dialog.MsgBox("消息内容") # 消息框
result = xsh.Dialog.Prompt("请输入", "默认值", False) # 输入框
# 第三个参数True表示密码模式
# ========== Clipboard对象 - 剪贴板 ==========
xsh.Clipboard.Text = "要复制的内容" # 写入剪贴板
content = xsh.Clipboard.Text # 读取剪贴板
xsh.Clipboard.Clear() # 清空剪贴板
# 注意:以上代码只能在Xshell内部运行
这个脚本能自动连接多台服务器,执行巡检命令,收集结果:
"""
服务器批量巡检脚本 - V哥出品
在Xshell中运行:工具 -> 脚本 -> 运行
"""
import datetime
# 服务器列表,实际使用时可以从文件读取
SERVERS = [
{"name": "Web服务器1", "host": "192.168.1.10", "user": "root", "pwd": "password1"},
{"name": "Web服务器2", "host": "192.168.1.11", "user": "root", "pwd": "password2"},
{"name": "DB服务器", "host": "192.168.1.20", "user": "root", "pwd": "password3"},
]
# 巡检命令列表
CHECK_COMMANDS = [
("主机名", "hostname"),
("系统负载", "uptime"),
("内存使用", "free -h"),
("磁盘使用", "df -h"),
("网络连接", "netstat -tunlp | head -20"),
]
def wait_for_prompt(timeout=10):
"""等待命令提示符"""
prompts = ["#", "$", ">"]
for prompt in prompts:
if xsh.Screen.WaitForString(prompt, timeout):
return True
return False
def send_command(cmd):
"""发送命令并获取结果"""
xsh.Screen.Clear()
xsh.Session.Sleep(200)
xsh.Screen.Send(cmd + "n")
xsh.Session.Sleep(1000) # 等待命令执行
wait_for_prompt(5)
# 获取屏幕内容
result = xsh.Screen.Get(1, 1, xsh.Screen.CurrentRow, xsh.Screen.Columns)
return result
def login_server(host, user, pwd):
"""登录服务器"""
# 发送SSH连接命令
xsh.Screen.Send(f"ssh {user}@{host}n")
xsh.Session.Sleep(2000)
# 处理首次连接的确认
screen_text = xsh.Screen.Get(1, 1, xsh.Screen.CurrentRow, 80)
if "yes/no" in screen_text or "fingerprint" in screen_text:
xsh.Screen.Send("yesn")
xsh.Session.Sleep(1000)
# 等待密码提示
if xsh.Screen.WaitForString("password:", 10):
xsh.Screen.Send(pwd + "n")
xsh.Session.Sleep(1500)
return True
return False
def check_single_server(server):
"""巡检单台服务器"""
report = []
report.append(f"n{'='*60}")
report.append(f"服务器: {server['name']} ({server['host']})")
report.append(f"巡检时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append('='*60)
# 登录服务器
if not login_server(server['host'], server['user'], server['pwd']):
report.append(" 登录失败!")
return 'n'.join(report)
report.append(" 登录成功")
# 执行巡检命令
for name, cmd in CHECK_COMMANDS:
report.append(f"n--- {name} ---")
result = send_command(cmd)
report.append(result)
# 退出当前服务器
xsh.Screen.Send("exitn")
xsh.Session.Sleep(500)
return 'n'.join(report)
def Main():
"""主函数"""
xsh.Dialog.MsgBox(f"即将开始巡检 {len(SERVERS)} 台服务器n点击确定开始")
all_reports = []
all_reports.append("=" * 60)
all_reports.append(" 服务器批量巡检报告 - V哥出品")
all_reports.append("=" * 60)
success_count = 0
fail_count = 0
for server in SERVERS:
try:
report = check_single_server(server)
all_reports.append(report)
success_count += 1
except Exception as e:
all_reports.append(f"n服务器 {server['name']} 巡检出错: {str(e)}")
fail_count += 1
# 生成汇总
all_reports.append("n" + "=" * 60)
all_reports.append(f"巡检完成!成功: {success_count}, 失败: {fail_count}")
all_reports.append("=" * 60)
# 保存报告到剪贴板
final_report = 'n'.join(all_reports)
xsh.Clipboard.Text = final_report
xsh.Dialog.MsgBox("巡检完成!报告已复制到剪贴板n你可以粘贴到文本编辑器保存")
Main()
"""
智能命令快捷输入 - V哥出品
预设常用命令,一键输入
"""
# 命令快捷键映射
COMMAND_SHORTCUTS = {
"1": ("查看系统信息", "uname -a && cat /etc/os-release"),
"2": ("查看内存", "free -h && cat /proc/meminfo | head -5"),
"3": ("查看磁盘", "df -h && lsblk"),
"4": ("查看进程TOP10", "ps aux --sort=-%mem | head -11"),
"5": ("查看网络连接", "netstat -tunlp"),
"6": ("查看系统日志", "tail -100 /var/log/messages 2>/dev/null || tail -100 /var/log/syslog"),
"7": ("查看登录历史", "last -20"),
"8": ("查看定时任务", "crontab -l && cat /etc/crontab"),
"9": ("Docker状态", "docker ps -a && docker images"),
"0": ("Nginx状态", "nginx -t && systemctl status nginx"),
}
def show_menu():
"""显示菜单"""
menu = "=== V哥的命令快捷菜单 ===nn"
for key, (name, cmd) in COMMAND_SHORTCUTS.items():
menu += f" [{key}] {name}n"
menu += "n [q] 退出n"
menu += "n请输入选项:"
return menu
def Main():
while True:
choice = xsh.Dialog.Prompt(show_menu(), "", False)
if choice is None or choice.lower() == 'q':
break
if choice in COMMAND_SHORTCUTS:
name, cmd = COMMAND_SHORTCUTS[choice]
# 确认执行
confirm = xsh.Dialog.MsgBox(f"即将执行: {name}nn命令: {cmd}nn确定执行吗?")
# 发送命令
xsh.Screen.Send(cmd + "n")
xsh.Session.Sleep(500)
else:
xsh.Dialog.MsgBox("无效选项,请重新输入")
Main()
很多时候Xshell内置脚本功能不够用,咱们需要开发独立的Python程序来配合。这部分才是真正的重头戏。
Paramiko是Python最牛的SSH库,能完全替代Xshell的核心功能:
"""
SSH连接管理器 - V哥出品
基于Paramiko实现,可以作为Xshell的补充工具
"""
import paramiko
import time
import threading
import queue
import json
import os
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('ssh_manager.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class ServerInfo:
"""服务器信息"""
name: str
host: str
port: int = 22
username: str = "root"
password: str = ""
key_file: str = ""
group: str = "默认分组"
class SSHConnection:
"""SSH连接封装类"""
def __init__(self, server: ServerInfo):
self.server = server
self.client = None
self.sftp = None
self.connected = False
def connect(self, timeout: int = 10) -> bool:
"""建立连接"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_params = {
'hostname': self.server.host,
'port': self.server.port,
'username': self.server.username,
'timeout': timeout,
}
# 优先使用密钥认证
if self.server.key_file and os.path.exists(self.server.key_file):
connect_params['key_filename'] = self.server.key_file
else:
connect_params['password'] = self.server.password
self.client.connect(**connect_params)
self.connected = True
logger.info(f"成功连接到 {self.server.name} ({self.server.host})")
return True
except paramiko.AuthenticationException:
logger.error(f"认证失败: {self.server.host}")
except paramiko.SSHException as e:
logger.error(f"SSH错误: {self.server.host} - {e}")
except Exception as e:
logger.error(f"连接失败: {self.server.host} - {e}")
return False
def execute(self, command: str, timeout: int = 30) -> Dict:
"""执行命令"""
if not self.connected:
return {'success': False, 'stdout': '', 'stderr': '未连接'}
try:
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
return {
'success': True,
'stdout': stdout.read().decode('utf-8', errors='ignore'),
'stderr': stderr.read().decode('utf-8', errors='ignore'),
'exit_code': stdout.channel.recv_exit_status()
}
except Exception as e:
return {'success': False, 'stdout': '', 'stderr': str(e)}
def execute_interactive(self, command: str, prompts: Dict[str, str] = None, timeout: int = 60) -> str:
"""
交互式命令执行
prompts: 提示符和回复的映射,比如 {"password:": "mypassword"}
"""
if not self.connected:
return "未连接"
prompts = prompts or {}
try:
channel = self.client.invoke_shell()
channel.settimeout(timeout)
time.sleep(0.5) # 等待shell就绪
channel.send(command + 'n')
output = ""
start_time = time.time()
while time.time() - start_time < timeout:
if channel.recv_ready():
chunk = channel.recv(4096).decode('utf-8', errors='ignore')
output += chunk
# 检查是否有需要回复的提示符
for prompt, response in prompts.items():
if prompt.lower() in output.lower():
channel.send(response + 'n')
time.sleep(0.3)
# 检查命令是否执行完成
if output.rstrip().endswith(('#', '$', '>')):
break
time.sleep(0.1)
channel.close()
return output
except Exception as e:
return f"执行出错: {e}"
def upload_file(self, local_path: str, remote_path: str) -> bool:
"""上传文件"""
if not self.connected:
return False
try:
if not self.sftp:
self.sftp = self.client.open_sftp()
self.sftp.put(local_path, remote_path)
logger.info(f"文件上传成功: {local_path} -> {remote_path}")
return True
except Exception as e:
logger.error(f"文件上传失败: {e}")
return False
def download_file(self, remote_path: str, local_path: str) -> bool:
"""下载文件"""
if not self.connected:
return False
try:
if not self.sftp:
self.sftp = self.client.open_sftp()
self.sftp.get(remote_path, local_path)
logger.info(f"文件下载成功: {remote_path} -> {local_path}")
return True
except Exception as e:
logger.error(f"文件下载失败: {e}")
return False
def close(self):
"""关闭连接"""
if self.sftp:
self.sftp.close()
if self.client:
self.client.close()
self.connected = False
logger.info(f"已断开 {self.server.name}")
class SSHManager:
"""SSH管理器 - 管理多台服务器"""
def __init__(self, config_file: str = "servers.json"):
self.config_file = config_file
self.servers: List[ServerInfo] = []
self.connections: Dict[str, SSHConnection] = {}
self.load_config()
def load_config(self):
"""加载服务器配置"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.servers = [ServerInfo(**s) for s in data]
logger.info(f"已加载 {len(self.servers)} 台服务器配置")
except Exception as e:
logger.error(f"加载配置失败: {e}")
def save_config(self):
"""保存服务器配置"""
try:
data = [asdict(s) for s in self.servers]
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info("配置已保存")
except Exception as e:
logger.error(f"保存配置失败: {e}")
def add_server(self, server: ServerInfo):
"""添加服务器"""
self.servers.append(server)
self.save_config()
def remove_server(self, name: str):
"""移除服务器"""
self.servers = [s for s in self.servers if s.name != name]
if name in self.connections:
self.connections[name].close()
del self.connections[name]
self.save_config()
def get_connection(self, name: str) -> Optional[SSHConnection]:
"""获取或创建连接"""
# 查找服务器
server = next((s for s in self.servers if s.name == name), None)
if not server:
logger.error(f"服务器不存在: {name}")
return None
# 检查是否已有连接
if name in self.connections and self.connections[name].connected:
return self.connections[name]
# 创建新连接
conn = SSHConnection(server)
if conn.connect():
self.connections[name] = conn
return conn
return None
def batch_execute(self, names: List[str], command: str,
max_workers: int = 10) -> Dict[str, Dict]:
"""
批量执行命令
使用多线程加速
"""
results = {}
result_queue = queue.Queue()
def worker(server_name):
conn = self.get_connection(server_name)
if conn:
result = conn.execute(command)
result['server'] = server_name
else:
result = {'success': False, 'server': server_name,
'stdout': '', 'stderr': '连接失败'}
result_queue.put(result)
# 启动线程
threads = []
for name in names:
t = threading.Thread(target=worker, args=(name,))
t.start()
threads.append(t)
# 控制并发数
if len(threads) >= max_workers:
for t in threads:
t.join()
threads = []
# 等待剩余线程
for t in threads:
t.join()
# 收集结果
while not result_queue.empty():
result = result_queue.get()
results[result['server']] = result
return results
def batch_execute_all(self, command: str) -> Dict[str, Dict]:
"""对所有服务器执行命令"""
names = [s.name for s in self.servers]
return self.batch_execute(names, command)
def close_all(self):
"""关闭所有连接"""
for conn in self.connections.values():
conn.close()
self.connections.clear()
# 使用示例
def demo():
"""演示如何使用"""
# 创建管理器
manager = SSHManager()
# 添加服务器(首次使用)
if not manager.servers:
manager.add_server(ServerInfo(
name="测试服务器1",
host="192.168.1.100",
username="root",
password="your_password"
))
manager.add_server(ServerInfo(
name="测试服务器2",
host="192.168.1.101",
username="root",
password="your_password"
))
# 单台服务器执行命令
conn = manager.get_connection("测试服务器1")
if conn:
result = conn.execute("uptime")
print(f"服务器负载: {result['stdout']}")
# 批量执行
results = manager.batch_execute_all("hostname && uptime")
for name, result in results.items():
print(f"n{name}:")
print(result['stdout'])
# 清理
manager.close_all()
if __name__ == "__main__":
demo()
光有命令行不够直观,咱们搞个图形界面:
"""
SSH图形化管理工具 - V哥出品
基于tkinter,不需要额外安装GUI库
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import threading
import paramiko
import json
import os
from datetime import datetime
class SSHManagerGUI:
def __init__(self):
self.window = tk.Tk()
self.window.title("V哥的SSH管理工具 v1.0")
self.window.geometry("1200x800")
self.servers = []
self.current_connection = None
self.config_file = "ssh_servers.json"
self.setup_ui()
self.load_servers()
def setup_ui(self):
"""设置界面"""
# 主框架
main_frame = ttk.Frame(self.window)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 左侧面板 - 服务器列表
left_frame = ttk.LabelFrame(main_frame, text="服务器列表", width=300)
left_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10))
left_frame.pack_propagate(False)
# 服务器列表
self.server_listbox = tk.Listbox(left_frame, width=35, height=20)
self.server_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.server_listbox.bind('<<ListboxSelect>>', self.on_server_select)
self.server_listbox.bind('<Double-Button-1>', self.on_server_double_click)
# 服务器管理按钮
btn_frame = ttk.Frame(left_frame)
btn_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(btn_frame, text="添加", command=self.add_server_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="编辑", command=self.edit_server_dialog).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="删除", command=self.delete_server).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="连接", command=self.connect_server).pack(side=tk.LEFT, padx=2)
# 右侧面板
right_frame = ttk.Frame(main_frame)
right_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 连接状态
status_frame = ttk.Frame(right_frame)
status_frame.pack(fill=tk.X, pady=(0, 10))
self.status_label = ttk.Label(status_frame, text="状态: 未连接", foreground="gray")
self.status_label.pack(side=tk.LEFT)
ttk.Button(status_frame, text="断开", command=self.disconnect).pack(side=tk.RIGHT)
# 命令输入区
cmd_frame = ttk.LabelFrame(right_frame, text="命令执行")
cmd_frame.pack(fill=tk.X, pady=(0, 10))
self.cmd_entry = ttk.Entry(cmd_frame, width=80)
self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5, pady=5)
self.cmd_entry.bind('<Return>', lambda e: self.execute_command())
ttk.Button(cmd_frame, text="执行", command=self.execute_command).pack(side=tk.LEFT, padx=5)
ttk.Button(cmd_frame, text="清屏", command=self.clear_output).pack(side=tk.LEFT, padx=5)
# 快捷命令
quick_frame = ttk.LabelFrame(right_frame, text="快捷命令")
quick_frame.pack(fill=tk.X, pady=(0, 10))
quick_commands = [
("系统信息", "uname -a"),
("内存", "free -h"),
("磁盘", "df -h"),
("进程", "ps aux --sort=-%mem | head -15"),
("网络", "netstat -tunlp"),
("Docker", "docker ps -a"),
]
for i, (name, cmd) in enumerate(quick_commands):
btn = ttk.Button(quick_frame, text=name,
command=lambda c=cmd: self.quick_execute(c))
btn.pack(side=tk.LEFT, padx=3, pady=5)
# 输出区域
output_frame = ttk.LabelFrame(right_frame, text="输出")
output_frame.pack(fill=tk.BOTH, expand=True)
self.output_text = scrolledtext.ScrolledText(output_frame, wrap=tk.WORD,
font=('Consolas', 10))
self.output_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 批量执行区域
batch_frame = ttk.LabelFrame(right_frame, text="批量执行")
batch_frame.pack(fill=tk.X, pady=(10, 0))
ttk.Button(batch_frame, text="选择服务器",
command=self.select_servers_dialog).pack(side=tk.LEFT, padx=5, pady=5)
ttk.Button(batch_frame, text="批量执行",
command=self.batch_execute_dialog).pack(side=tk.LEFT, padx=5, pady=5)
ttk.Button(batch_frame, text="导出结果",
command=self.export_results).pack(side=tk.LEFT, padx=5, pady=5)
def load_servers(self):
"""加载服务器列表"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
self.servers = json.load(f)
self.refresh_server_list()
except:
pass
def save_servers(self):
"""保存服务器列表"""
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.servers, f, ensure_ascii=False, indent=2)
def refresh_server_list(self):
"""刷新服务器列表显示"""
self.server_listbox.delete(0, tk.END)
for server in self.servers:
status = "●" if server.get('connected') else "○"
self.server_listbox.insert(tk.END, f"{status} {server['name']} ({server['host']})")
def add_server_dialog(self):
"""添加服务器对话框"""
dialog = tk.Toplevel(self.window)
dialog.title("添加服务器")
dialog.geometry("400x300")
dialog.transient(self.window)
dialog.grab_set()
fields = [
("名称", "name", ""),
("主机", "host", ""),
("端口", "port", "22"),
("用户名", "username", "root"),
("密码", "password", ""),
]
entries = {}
for i, (label, key, default) in enumerate(fields):
ttk.Label(dialog, text=label + ":").grid(row=i, column=0, padx=10, pady=5, sticky="e")
entry = ttk.Entry(dialog, width=30)
entry.insert(0, default)
if key == "password":
entry.config(show="*")
entry.grid(row=i, column=1, padx=10, pady=5)
entries[key] = entry
def save():
server = {key: entry.get() for key, entry in entries.items()}
server['port'] = int(server['port'])
self.servers.append(server)
self.save_servers()
self.refresh_server_list()
dialog.destroy()
ttk.Button(dialog, text="保存", command=save).grid(row=len(fields), column=0,
columnspan=2, pady=20)
def edit_server_dialog(self):
"""编辑服务器"""
selection = self.server_listbox.curselection()
if not selection:
messagebox.showwarning("提示", "请先选择一个服务器")
return
index = selection[0]
server = self.servers[index]
dialog = tk.Toplevel(self.window)
dialog.title("编辑服务器")
dialog.geometry("400x300")
dialog.transient(self.window)
dialog.grab_set()
fields = [
("名称", "name"),
("主机", "host"),
("端口", "port"),
("用户名", "username"),
("密码", "password"),
]
entries = {}
for i, (label, key) in enumerate(fields):
ttk.Label(dialog, text=label + ":").grid(row=i, column=0, padx=10, pady=5, sticky="e")
entry = ttk.Entry(dialog, width=30)
entry.insert(0, str(server.get(key, '')))
if key == "password":
entry.config(show="*")
entry.grid(row=i, column=1, padx=10, pady=5)
entries[key] = entry
def save():
for key, entry in entries.items():
value = entry.get()
if key == 'port':
value = int(value)
self.servers[index][key] = value
self.save_servers()
self.refresh_server_list()
dialog.destroy()
ttk.Button(dialog, text="保存", command=save).grid(row=len(fields), column=0,
columnspan=2, pady=20)
def delete_server(self):
"""删除服务器"""
selection = self.server_listbox.curselection()
if not selection:
messagebox.showwarning("提示", "请先选择一个服务器")
return
if messagebox.askyesno("确认", "确定要删除这个服务器吗?"):
del self.servers[selection[0]]
self.save_servers()
self.refresh_server_list()
def on_server_select(self, event):
"""选择服务器事件"""
pass
def on_server_double_click(self, event):
"""双击连接服务器"""
self.connect_server()
def connect_server(self):
"""连接服务器"""
selection = self.server_listbox.curselection()
if not selection:
messagebox.showwarning("提示", "请先选择一个服务器")
return
server = self.servers[selection[0]]
self.status_label.config(text=f"状态: 正在连接 {server['name']}...", foreground="orange")
self.window.update()
def connect_thread():
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=server['host'],
port=server.get('port', 22),
username=server['username'],
password=server['password'],
timeout=10
)
self.current_connection = client
self.window.after(0, lambda: self.on_connected(server))
except Exception as e:
self.window.after(0, lambda: self.on_connect_error(str(e)))
threading.Thread(target=connect_thread, daemon=True).start()
def on_connected(self, server):
"""连接成功回调"""
self.status_label.config(
text=f"状态: 已连接 {server['name']} ({server['host']})",
foreground="green"
)
self.append_output(f"n{'='*50}n")
self.append_output(f"已连接到 {server['name']} ({server['host']})n")
self.append_output(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}n")
self.append_output(f"{'='*50}nn")
def on_connect_error(self, error):
"""连接失败回调"""
self.status_label.config(text="状态: 连接失败", foreground="red")
messagebox.showerror("连接失败", error)
def disconnect(self):
"""断开连接"""
if self.current_connection:
self.current_connection.close()
self.current_connection = None
self.status_label.config(text="状态: 未连接", foreground="gray")
self.append_output("n[已断开连接]n")
def execute_command(self):
"""执行命令"""
if not self.current_connection:
messagebox.showwarning("提示", "请先连接服务器")
return
command = self.cmd_entry.get().strip()
if not command:
return
self.cmd_entry.delete(0, tk.END)
self.append_output(f"n$ {command}n")
def execute_thread():
try:
stdin, stdout, stderr = self.current_connection.exec_command(command, timeout=30)
output = stdout.read().decode('utf-8', errors='ignore')
error = stderr.read().decode('utf-8', errors='ignore')
self.window.after(0, lambda: self.append_output(output))
if error:
self.window.after(0, lambda: self.append_output(f"[错误] {error}"))
except Exception as e:
self.window.after(0, lambda: self.append_output(f"[执行失败] {e}n"))
threading.Thread(target=execute_thread, daemon=True).start()
def quick_execute(self, command):
"""快捷命令执行"""
self.cmd_entry.delete(0, tk.END)
self.cmd_entry.insert(0, command)
self.execute_command()
def append_output(self, text):
"""添加输出"""
self.output_text.insert(tk.END, text)
self.output_text.see(tk.END)
def clear_output(self):
"""清空输出"""
self.output_text.delete(1.0, tk.END)
def select_servers_dialog(self):
"""选择多台服务器对话框"""
dialog = tk.Toplevel(self.window)
dialog.title("选择服务器")
dialog.geometry("300x400")
dialog.transient(self.window)
dialog.grab_set()
# 多选列表
listbox = tk.Listbox(dialog, selectmode=tk.MULTIPLE, height=15)
listbox.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
for server in self.servers:
listbox.insert(tk.END, f"{server['name']} ({server['host']})")
self.selected_servers = []
def confirm():
selections = listbox.curselection()
self.selected_servers = [self.servers[i] for i in selections]
dialog.destroy()
if self.selected_servers:
messagebox.showinfo("提示", f"已选择 {len(self.selected_servers)} 台服务器")
ttk.Button(dialog, text="确定", command=confirm).pack(pady=10)
def batch_execute_dialog(self):
"""批量执行对话框"""
if not hasattr(self, 'selected_servers') or not self.selected_servers:
messagebox.showwarning("提示", "请先选择服务器")
return
dialog = tk.Toplevel(self.window)
dialog.title("批量执行命令")
dialog.geometry("500x300")
dialog.transient(self.window)
dialog.grab_set()
ttk.Label(dialog, text="输入要执行的命令:").pack(padx=10, pady=10)
cmd_text = scrolledtext.ScrolledText(dialog, height=5, width=50)
cmd_text.pack(padx=10, pady=5)
result_text = scrolledtext.ScrolledText(dialog, height=10, width=50)
result_text.pack(padx=10, pady=5)
def execute():
command = cmd_text.get(1.0, tk.END).strip()
if not command:
return
result_text.delete(1.0, tk.END)
result_text.insert(tk.END, "正在执行...n")
def batch_thread():
for server in self.selected_servers:
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=server['host'],
port=server.get('port', 22),
username=server['username'],
password=server['password'],
timeout=10
)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode('utf-8', errors='ignore')
result = f"n{'='*40}n{server['name']} ({server['host']}):n{output}"
self.window.after(0, lambda r=result: result_text.insert(tk.END, r))
client.close()
except Exception as e:
error = f"n{server['name']}: 失败 - {e}"
self.window.after(0, lambda r=error: result_text.insert(tk.END, r))
self.window.after(0, lambda: result_text.insert(tk.END, "nn执行完成!"))
threading.Thread(target=batch_thread, daemon=True).start()
ttk.Button(dialog, text="执行", command=execute).pack(pady=10)
def export_results(self):
"""导出结果"""
content = self.output_text.get(1.0, tk.END)
if not content.strip():
messagebox.showwarning("提示", "没有可导出的内容")
return
filename = filedialog.asksaveasfilename(
defaultextension=".txt",
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")]
)
if filename:
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
messagebox.showinfo("成功", f"已保存到 {filename}")
def run(self):
"""运行程序"""
self.window.mainloop()
if __name__ == "__main__":
app = SSHManagerGUI()
app.run()
这个工具可以跟Xshell配合使用,生成配置、管理会话:
"""
Xshell会话管理辅助工具 - V哥出品
功能:批量生成和管理Xshell会话文件
"""
import os
import json
from pathlib import Path
from typing import List, Dict
import configparser
import base64
class XshellSessionManager:
"""Xshell会话管理器"""
def __init__(self, sessions_path: str = None):
# Xshell默认会话目录
if sessions_path:
self.sessions_path = Path(sessions_path)
else:
# 尝试找到Xshell会话目录
home = Path.home()
possible_paths = [
home / "Documents" / "NetSarang Computer" / "7" / "Xshell" / "Sessions",
home / "Documents" / "NetSarang Computer" / "6" / "Xshell" / "Sessions",
home / "Documents" / "NetSarang" / "Xshell" / "Sessions",
]
for p in possible_paths:
if p.exists():
self.sessions_path = p
break
else:
self.sessions_path = Path("./xshell_sessions")
self.sessions_path.mkdir(exist_ok=True)
print(f"会话目录: {self.sessions_path}")
def create_session_file(self, name: str, host: str, port: int = 22,
username: str = "", password: str = "",
key_file: str = "", folder: str = "") -> str:
"""
创建Xshell会话文件 (.xsh)
Xshell 6/7 使用的是类似INI格式的配置文件
"""
session_content = f"""[CONNECTION]
Host={host}
Port={port}
UserName={username}
Protocol=SSH
[CONNECTION:AUTHENTICATION]
UseSystemCerts=0
KeyExchangeAlgorithms=
HostKeyAlgorithms=
Ciphers=
MACs=
AuthenticationOrder=gssapi-with-mic,publickey,keyboard-interactive,password
[CONNECTION:PROXY]
Type=0
Host=
Port=0
UserName=
Password=
[CONNECTION:FOLDER]
Path={folder}
[SESSION]
LocalEcho=0
CIK=0
LogDateFormat=0
Logging=0
LogFileAppend=0
LogFileName=
"""
# 密码加密(Xshell使用特定格式,这里简化处理)
if password:
# 注意:Xshell的密码加密比较复杂,这里只是示例
# 实际使用中建议使用密钥认证或手动输入密码
session_content += f"""
[CONNECTION:AUTHENTICATION:PASSWORD]
Password={self._simple_encode(password)}
"""
if key_file:
session_content += f"""
[CONNECTION:AUTHENTICATION:PUBLICKEY]
KeyFile={key_file}
"""
# 确保目标目录存在
if folder:
target_dir = self.sessions_path / folder
target_dir.mkdir(parents=True, exist_ok=True)
file_path = target_dir / f"{name}.xsh"
else:
file_path = self.sessions_path / f"{name}.xsh"
# 写入文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(session_content)
print(f"已创建会话: {file_path}")
return str(file_path)
def _simple_encode(self, text: str) -> str:
"""简单编码(非安全加密,仅作演示)"""
return base64.b64encode(text.encode()).decode()
def batch_create_from_json(self, json_file: str):
"""
从JSON文件批量创建会话
JSON格式示例:
[
{"name": "服务器1", "host": "192.168.1.1", "username": "root", "folder": "生产环境"},
{"name": "服务器2", "host": "192.168.1.2", "username": "root", "folder": "测试环境"}
]
"""
with open(json_file, 'r', encoding='utf-8') as f:
servers = json.load(f)
for server in servers:
self.create_session_file(**server)
print(f"n批量创建完成!共 {len(servers)} 个会话")
def batch_create_from_csv(self, csv_file: str):
"""
从CSV文件批量创建会话
CSV格式: name,host,port,username,password,folder
"""
import csv
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
count = 0
for row in reader:
if 'port' in row and row['port']:
row['port'] = int(row['port'])
else:
row['port'] = 22
self.create_session_file(**row)
count += 1
print(f"n批量创建完成!共 {count} 个会话")
def list_sessions(self, folder: str = "") -> List[Dict]:
"""列出所有会话"""
sessions = []
search_path = self.sessions_path / folder if folder else self.sessions_path
for xsh_file in search_path.rglob("*.xsh"):
try:
config = configparser.ConfigParser()
config.read(xsh_file, encoding='utf-8')
session_info = {
'name': xsh_file.stem,
'file': str(xsh_file),
'host': config.get('CONNECTION', 'Host', fallback=''),
'port': config.get('CONNECTION', 'Port', fallback='22'),
'username': config.get('CONNECTION', 'UserName', fallback=''),
}
sessions.append(session_info)
except:
pass
return sessions
def export_sessions_to_json(self, output_file: str, folder: str = ""):
"""导出会话列表到JSON"""
sessions = self.list_sessions(folder)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(sessions, f, ensure_ascii=False, indent=2)
print(f"已导出 {len(sessions)} 个会话到 {output_file}")
def search_sessions(self, keyword: str) -> List[Dict]:
"""搜索会话"""
all_sessions = self.list_sessions()
results = []
keyword = keyword.lower()
for session in all_sessions:
if (keyword in session['name'].lower() or
keyword in session['host'].lower()):
results.append(session)
return results
def generate_connect_script(self, sessions: List[Dict], output_file: str):
"""
生成批量连接脚本
可以在Xshell中直接运行
"""
script_content = '''"""
批量连接脚本 - V哥出品
在Xshell中运行: 工具 -> 脚本 -> 运行
"""
def Main():
servers = {servers_json}
for server in servers:
xsh.Dialog.MsgBox(f"即将连接: {{server['name']}}")
# 构建SSH URL
url = f"ssh://{{server['username']}}@{{server['host']}}:{{server['port']}}"
# 打开会话
xsh.Session.Open(url)
xsh.Session.Sleep(2000)
# 等待连接
if xsh.Session.Connected:
xsh.Dialog.MsgBox(f"{{server['name']}} 连接成功")
else:
xsh.Dialog.MsgBox(f"{{server['name']}} 连接失败")
Main()
'''.format(servers_json=json.dumps(sessions, ensure_ascii=False, indent=4))
with open(output_file, 'w', encoding='utf-8') as f:
f.write(script_content)
print(f"连接脚本已生成: {output_file}")
def main():
"""演示用法"""
manager = XshellSessionManager()
# 单个创建
manager.create_session_file(
name="测试服务器",
host="192.168.1.100",
port=22,
username="root",
folder="测试环境"
)
# 批量创建示例JSON
sample_servers = [
{"name": "Web-01", "host": "192.168.1.10", "username": "root", "folder": "生产/Web"},
{"name": "Web-02", "host": "192.168.1.11", "username": "root", "folder": "生产/Web"},
{"name": "DB-Master", "host": "192.168.1.20", "username": "root", "folder": "生产/DB"},
{"name": "DB-Slave", "host": "192.168.1.21", "username": "root", "folder": "生产/DB"},
{"name": "Test-01", "host": "192.168.2.10", "username": "deploy", "folder": "测试"},
]
# 保存示例JSON
with open("sample_servers.json", 'w', encoding='utf-8') as f:
json.dump(sample_servers, f, ensure_ascii=False, indent=2)
# 批量创建
manager.batch_create_from_json("sample_servers.json")
# 列出会话
print("n当前会话列表:")
for session in manager.list_sessions():
print(f" - {session['name']}: {session['host']}")
if __name__ == "__main__":
main()
把前面的东西整合一下,搞个完整的工具:
"""
V哥运维工具箱 - 终极版
集成了所有功能的一站式运维平台
"""
import sys
import os
# 确保能找到模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from typing import List, Dict, Optional
import json
import time
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import paramiko
class VOperationPlatform:
"""V哥运维平台"""
def __init__(self, config_file: str = "vops_config.json"):
self.config_file = config_file
self.servers: List[Dict] = []
self.groups: Dict[str, List[str]] = {}
self.command_history: List[Dict] = []
self.task_results: List[Dict] = []
self.load_config()
def load_config(self):
"""加载配置"""
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
self.servers = config.get('servers', [])
self.groups = config.get('groups', {})
def save_config(self):
"""保存配置"""
config = {
'servers': self.servers,
'groups': self.groups
}
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# ========== 服务器管理 ==========
def add_server(self, name: str, host: str, port: int = 22,
username: str = "root", password: str = "",
key_file: str = "", groups: List[str] = None):
"""添加服务器"""
server = {
'name': name,
'host': host,
'port': port,
'username': username,
'password': password,
'key_file': key_file,
'groups': groups or []
}
self.servers.append(server)
# 更新分组
for group in (groups or []):
if group not in self.groups:
self.groups[group] = []
if name not in self.groups[group]:
self.groups[group].append(name)
self.save_config()
print(f" 服务器已添加: {name} ({host})")
def remove_server(self, name: str):
"""移除服务器"""
self.servers = [s for s in self.servers if s['name'] != name]
for group in self.groups.values():
if name in group:
group.remove(name)
self.save_config()
print(f" 服务器已移除: {name}")
def get_servers_by_group(self, group: str) -> List[Dict]:
"""按分组获取服务器"""
server_names = self.groups.get(group, [])
return [s for s in self.servers if s['name'] in server_names]
# ========== 命令执行 ==========
def _connect(self, server: Dict) -> Optional[paramiko.SSHClient]:
"""建立SSH连接"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_args = {
'hostname': server['host'],
'port': server['port'],
'username': server['username'],
'timeout': 10
}
if server.get('key_file') and os.path.exists(server['key_file']):
connect_args['key_filename'] = server['key_file']
else:
connect_args['password'] = server['password']
client.connect(**connect_args)
return client
except Exception as e:
print(f" 连接失败 {server['name']}: {e}")
return None
def execute_on_server(self, server: Dict, command: str) -> Dict:
"""在单台服务器上执行命令"""
result = {
'server': server['name'],
'host': server['host'],
'command': command,
'success': False,
'stdout': '',
'stderr': '',
'time': datetime.now().isoformat()
}
client = self._connect(server)
if not client:
result['stderr'] = '连接失败'
return result
try:
stdin, stdout, stderr = client.exec_command(command, timeout=60)
result['stdout'] = stdout.read().decode('utf-8', errors='ignore')
result['stderr'] = stderr.read().decode('utf-8', errors='ignore')
result['exit_code'] = stdout.channel.recv_exit_status()
result['success'] = result['exit_code'] == 0
except Exception as e:
result['stderr'] = str(e)
finally:
client.close()
return result
def batch_execute(self, server_names: List[str], command: str,
max_workers: int = 10) -> List[Dict]:
"""批量执行命令"""
servers = [s for s in self.servers if s['name'] in server_names]
results = []
print(f"n{'='*60}")
print(f"批量执行命令: {command}")
print(f"目标服务器: {len(servers)} 台")
print(f"{'='*60}n")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.execute_on_server, server, command): server
for server in servers
}
for future in as_completed(futures):
server = futures[future]
result = future.result()
results.append(result)
status = "" if result['success'] else ""
print(f"{status} {result['server']}: {result['stdout'][:100]}...")
# 记录历史
self.command_history.append({
'command': command,
'servers': server_names,
'time': datetime.now().isoformat(),
'results': results
})
return results
def execute_on_group(self, group: str, command: str) -> List[Dict]:
"""对整个分组执行命令"""
server_names = self.groups.get(group, [])
if not server_names:
print(f"分组 {group} 没有服务器")
return []
return self.batch_execute(server_names, command)
def execute_on_all(self, command: str) -> List[Dict]:
"""对所有服务器执行命令"""
server_names = [s['name'] for s in self.servers]
return self.batch_execute(server_names, command)
# ========== 文件操作 ==========
def upload_file(self, server_names: List[str], local_path: str,
remote_path: str) -> Dict[str, bool]:
"""批量上传文件"""
results = {}
for name in server_names:
server = next((s for s in self.servers if s['name'] == name), None)
if not server:
results[name] = False
continue
client = self._connect(server)
if not client:
results[name] = False
continue
try:
sftp = client.open_sftp()
sftp.put(local_path, remote_path)
sftp.close()
results[name] = True
print(f" 上传成功: {name}")
except Exception as e:
results[name] = False
print(f" 上传失败 {name}: {e}")
finally:
client.close()
return results
def download_file(self, server_name: str, remote_path: str,
local_path: str) -> bool:
"""下载文件"""
server = next((s for s in self.servers if s['name'] == server_name), None)
if not server:
return False
client = self._connect(server)
if not client:
return False
try:
sftp = client.open_sftp()
sftp.get(remote_path, local_path)
sftp.close()
print(f" 下载成功: {remote_path} -> {local_path}")
return True
except Exception as e:
print(f" 下载失败: {e}")
return False
finally:
client.close()
# ========== 坚控检查 ==========
def health_check(self, server_names: List[str] = None) -> List[Dict]:
"""健康检查"""
if server_names is None:
server_names = [s['name'] for s in self.servers]
check_commands = {
'uptime': 'uptime',
'memory': "free -h | grep Mem | awk '{print $3"/"$2}'",
'disk': "df -h / | tail -1 | awk '{print $5}'",
'load': "cat /proc/loadavg | awk '{print $1,$2,$3}'",
'cpu_count': "nproc",
}
results = []
for name in server_names:
server = next((s for s in self.servers if s['name'] == name), None)
if not server:
continue
health = {
'server': name,
'host': server['host'],
'status': 'unknown',
'metrics': {}
}
client = self._connect(server)
if not client:
health['status'] = 'offline'
results.append(health)
continue
try:
for metric, cmd in check_commands.items():
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().decode().strip()
health['metrics'][metric] = output
health['status'] = 'online'
except Exception as e:
health['status'] = 'error'
health['error'] = str(e)
finally:
client.close()
results.append(health)
# 打印结果
print(f"n{'='*70}")
print(f"{'服务器':<20} {'状态':<10} {'负载':<15} {'内存':<10} {'磁盘':<10}")
print(f"{'='*70}")
for r in results:
metrics = r.get('metrics', {})
print(f"{r['server']:<20} {r['status']:<10} "
f"{metrics.get('load', 'N/A'):<15} "
f"{metrics.get('memory', 'N/A'):<10} "
f"{metrics.get('disk', 'N/A'):<10}")
print(f"{'='*70}n")
return results
# ========== 报告生成 ==========
def generate_report(self, results: List[Dict], output_file: str):
"""生成执行报告"""
report = []
report.append("=" * 60)
report.append(" V哥运维平台 - 执行报告")
report.append("=" * 60)
report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"服务器数量: {len(results)}")
report.append("")
success_count = sum(1 for r in results if r.get('success'))
fail_count = len(results) - success_count
report.append(f"成功: {success_count} 失败: {fail_count}")
report.append("")
for r in results:
status = "" if r.get('success') else ""
report.append(f"{status} {r['server']} ({r.get('host', '')})")
if r.get('stdout'):
report.append(f" 输出: {r['stdout'][:200]}")
if r.get('stderr'):
report.append(f" 错误: {r['stderr'][:200]}")
report.append("")
report_text = 'n'.join(report)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_text)
print(f"报告已生成: {output_file}")
return report_text
# ========== 交互式菜单 ==========
def interactive_menu(self):
"""交互式菜单"""
while True:
print("n" + "=" * 40)
print(" V哥运维平台 v1.0")
print("=" * 40)
print("1. 查看服务器列表")
print("2. 添加服务器")
print("3. 执行命令(单台)")
print("4. 批量执行命令")
print("5. 健康检查")
print("6. 上传文件")
print("7. 下载文件")
print("0. 退出")
print("=" * 40)
choice = input("请选择: ").strip()
if choice == '0':
print("再见!")
break
elif choice == '1':
self._menu_list_servers()
elif choice == '2':
self._menu_add_server()
elif choice == '3':
self._menu_execute_single()
elif choice == '4':
self._menu_batch_execute()
elif choice == '5':
self.health_check()
elif choice == '6':
self._menu_upload()
elif choice == '7':
self._menu_download()
else:
print("无效选项")
def _menu_list_servers(self):
print("n服务器列表:")
print("-" * 50)
for i, s in enumerate(self.servers):
groups = ', '.join(s.get('groups', []))
print(f"{i+1}. {s['name']:<20} {s['host']:<15} [{groups}]")
def _menu_add_server(self):
name = input("名称: ").strip()
host = input("主机: ").strip()
port = input("端口 [22]: ").strip() or "22"
username = input("用户名 [root]: ").strip() or "root"
password = input("密码: ").strip()
groups = input("分组(逗号分隔): ").strip()
groups = [g.strip() for g in groups.split(',')] if groups else []
self.add_server(name, host, int(port), username, password, groups=groups)
def _menu_execute_single(self):
self._menu_list_servers()
idx = int(input("选择服务器编号: ")) - 1
if 0 <= idx < len(self.servers):
command = input("输入命令: ").strip()
result = self.execute_on_server(self.servers[idx], command)
print(f"n输出:n{result['stdout']}")
if result['stderr']:
print(f"错误:n{result['stderr']}")
def _menu_batch_execute(self):
self._menu_list_servers()
indices = input("选择服务器(逗号分隔,如 1,2,3 或 all): ").strip()
if indices.lower() == 'all':
names = [s['name'] for s in self.servers]
else:
indices = [int(i.strip()) - 1 for i in indices.split(',')]
names = [self.servers[i]['name'] for i in indices if 0 <= i < len(self.servers)]
command = input("输入命令: ").strip()
results = self.batch_execute(names, command)
save = input("是否保存报告?(y/n): ").strip().lower()
if save == 'y':
self.generate_report(results, f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
def _menu_upload(self):
self._menu_list_servers()
indices = input("选择服务器(逗号分隔): ").strip()
indices = [int(i.strip()) - 1 for i in indices.split(',')]
names = [self.servers[i]['name'] for i in indices if 0 <= i < len(self.servers)]
local_path = input("本地文件路径: ").strip()
remote_path = input("远程路径: ").strip()
self.upload_file(names, local_path, remote_path)
def _menu_download(self):
self._menu_list_servers()
idx = int(input("选择服务器编号: ")) - 1
if 0 <= idx < len(self.servers):
remote_path = input("远程文件路径: ").strip()
local_path = input("本地保存路径: ").strip()
self.download_file(self.servers[idx]['name'], remote_path, local_path)
if __name__ == "__main__":
platform = VOperationPlatform()
# 如果没有服务器,添加演示数据
if not platform.servers:
print("首次运行,添加演示服务器...")
platform.add_server("Demo-Server", "demo.example.com", 22, "root", "password",
groups=["演示"])
platform.interactive_menu()
聊了这么多,最后V哥给你总结几点实战经验:
1. 能用密钥就别用密码
密钥认证比密码安全多了,配置起来也不麻烦:
# 生成密钥对
ssh-keygen -t rsa -b 4096
# 复制公钥到服务器
ssh-copy-id user@host
2. 做好日志记录
运维工具一定要有日志,出了问题能查:
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('vops.log', encoding='utf-8'),
logging.StreamHandler()
]
)
3. 控制并发,别把服务器搞挂了
批量执行的时候控制好并发数,别一下子全上。
4. 命令执行前三思
尤其是批量操作,执行前一定要确认命令没问题,rm -rf 这种命令要格外小心。
5. 定期备份配置
服务器配置文件、密码这些都是敏感信息,做好备份和加密。
好了兄弟们,今天关于Xshell插件开发和Python运维工具的内容就讲到这儿。从简单的Xshell内置脚本,到独立的Python运维平台,V哥都给你掰扯明白了。
工具是死的,人是活的。这些代码你可以直接拿去用,但更重要的是理解背后的思路,这样遇到新需求你也能自己搞定。
有问题评论区见,V哥有空就回。下期再见!
V哥原创,转载请注明出处