使用python批量管理linux设备
为了省赛比赛过程中批量管理选手机,写了一个python文件,记录一下方便以后使用。
# 代码
from fabric import Connection
from typing import List
from threading import Thread,Lock
from datetime import datetime
import time
import os
PRIVATE_KEY = "id_rsa"
ADMIN_USER = "liyaning"
NORMAL_USER = "jscpc"
SUDO_PASSWORD = "123"
UNLOCK_COMMAND = {'command':"export DISPLAY=:0; gnome-screensaver; gnome-screensaver-command -d;",'user':NORMAL_USER}
LOCK_COMMAND = {'command':"export DISPLAY=:0; gnome-screensaver; gnome-screensaver-command -l;",'user':ADMIN_USER}
LOGOUT_COMMAND = {'command':"gnome-session-quit --force",'user':NORMAL_USER}
CLEAR_COMMAND = {'command':["rm -rf /tmp/*", "rm -rf /var/tmp/*", "rsync -av --delete /opt/backup/jscpc-backup/jscpc /home"],'user':ADMIN_USER}
COMMAND = CLEAR_COMMAND
class Result:
def __init__(self,server,success,msg):
self.server = server
self.success = success
self.msg = msg
def generage_servers()->List[Connection]:
return [f'192.168.{x}.{y}' for x in range(1,2) for y in range(103,104)]
def fun(server:str):
conn = Connection(server,user = COMMAND['user'])
try:
if COMMAND['user'] == ADMIN_USER:
for comm in COMMAND['command']:
msg:str = str(conn.sudo(comm,hide="both",password=SUDO_PASSWORD))
else:
msg:str = str(conn.run(COMMAND['command'],hide="both"))
msg = msg.replace("\n","").replace("\r","")
result = Result(server,True,msg)
success_lock.acquire()
successes.append(result)
success_lock.release()
except Exception as err:
msg = str(err)
msg = msg.replace("\n","").replace("\r","")
result = Result(server,False,msg)
fail_lock.acquire()
fails.append(result)
fail_lock.release()
success_lock = Lock()
fail_lock = Lock()
successes:List[Result] = list()
fails:List[Result] = list()
if __name__ == "__main__":
start_time = time.time()
threads:List[Thread] = list()
servers = generage_servers()
for server in servers:
t = Thread(target=fun,args=(server,))
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"总数量:{len(servers)},成功数量:{len(successes)},失败数量:{len(fails)}")
end_time = time.time()
print(f"程序运行时间为:{end_time - start_time}秒")
fmt_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S.txt")
successes.sort(key=lambda x: x.server)
fails.sort(key=lambda x: x.server)
if not os.path.exists("log"):
os.mkdir("log")
with open(f"log/success-{fmt_time}","w",encoding="utf8") as f:
f.write(f"command:{COMMAND}\n")
for suc in successes:
content = f"{suc.server}\n"
f.write(content)
with open(f"log/fail-{fmt_time}","w",encoding="utf8") as f:
f.write(f"command:{COMMAND}\n")
for fail in fails:
content = f"{fail.server}:{fail.msg}\n"
f.write(content)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# 准备
- 修改一些变量:
PRIVATE_KEY = 私钥文件的路径 ADMIN_USER = 具有root权限的用户名,注意只要具有root权限,其命令就会以sudo执行 NORMAL_USER = 普通用户的用户名 SUDO_PASSWORD = sudo用户的密码
保与私钥对应的公钥已被添加进目标主机。
修改generage_servers()函数,生成需要控制的ip地址。
# 使用
修改COMMAND=XX,为你要执行的命令,直接运行,main.py即可。
编辑 (opens new window)
上次更新: 2024/12/04, 16:28:16