目 录CONTENT

文章目录

自动升级

懿曲折扇情
2022-07-15 / 3 评论 / 10 点赞 / 312 阅读 / 1,079 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-07-15,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
广告 广告

自动升级

# coding=utf-8
"""
    @Project :pachong-master
    @File    :common.py
    @Author  :gaojs
    @Date    :2022/7/15 14:01
    @Blogs   : https://www.gaojs.com.cn
"""
import sys
import logging
import time
import paramiko
from time import sleep
import re
import requests
import schedule
from faker import Factory
from selenium import webdriver
from selenium.webdriver.chrome.options import Options


sys.setrecursionlimit(5000)


class CLI:

    def ssh_ag(self, hostname='192.168.120.6', port=22, username='array', password='admin'):
        """

        :param self:
        :return:
        """
        # 创建ssh对象
        self.fin = open('log.txt', 'w')
        self.ssh = paramiko.SSHClient()
        self.logging = logging
        # 允许连接不在know_hosts文件中的主机
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
        # 连接AG
        self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
        sleep(5)
        channel = self.ssh.invoke_shell()
        self.channel = channel
        channel.settimeout(5)
        output = channel.recv(2048).decode('utf-8')
        time.sleep(1)
        self.fin.write(output)
        self.cli_cmd('enable')
        self.cli_cmd('')
        self.cli_cmd('config ter')
        self.cli_cmd('no page')

    def print_step(self):
        """

        :return:
        """
        result = self.channel.recv(2048)
        print(result.decode())

    def cli_cmd(self, cli, prompt="[#\$]", timeout=3):
        """

        :param cli:
        :return:
        """
        output = ''
        self.logging.info("AG execute command in enable mode:  " + cli)
        self.channel.send(cli + '\n')
        output = self.read_until(prompt, timeout)
        return output

    def quit_enable(self):
        """

        :return:
        """
        self.cli_cmd('switch global')
        self.cli_cmd('exit')

    def clear_log(self):
        """

        :return:
        """
        sleep(2)
        self.cli_cmd('clear log b')

    def read_until(self, expected, timeout=10):
        """
        等待回显:这个方法是从同事那里偷来的哈哈哈
        :return:
        """
        output = ''
        regexp = re.compile(expected)
        max_time = time.time() + timeout
        i = 0
        while time.time() < max_time:
            i = i + 1
            tmp = ""
            try:
                tmp = self.channel.recv(1024).decode('utf-8')
            except:
                pass
            self.fin.write(tmp)
            self.fin.flush()
            output += tmp
            if regexp.search(output):
                return output
        return output

    def switch_vsite(self, vsite):
        """
        切换虚拟站点
        :param vsite:
        :return:
        """
        self.cli_cmd('sw %s' % vsite)

    def get_sn(self):
        """
        获取sn码
        :return:
        """
        # with open('log.txt', mode='r') as fin:
        #     resp = fin.read()
        resp = self.cli_cmd('show version')
        result = re.compile('Serial Number : [A-Z0-9]{31}')
        sn = result.findall(resp)[0]
        sn_number = sn.split(':')[1]
        # print(sn_number)
        return sn_number

    def create_test_password(self, sn):
        """
        生成test用户密码
        """
        url = 'http://10.3.0.50/cgi-bin/passwd_res'
        f = Factory.create()
        ua = f.user_agent()
        headers = {
            'User-Agent': ua
        }
        data = {
            'serial': sn
        }
        rsp = requests.post(url=url, headers=headers, data=data)
        passwd = re.findall('password: (.*?)</pre>', rsp.text)[0]
        print(passwd)
        return passwd

    def update_ag(self, latest_builds):
        """
        自动更新uag
        """
        self.cli_cmd('config terminal')
        self.cli_cmd(f'system update "http://192.168.120.204/builds/netIAG/{latest_builds}"')
        time.sleep(1800)

    def get_current_release(self):
        """
        获取当前版本号
        """
        resp = self.cli_cmd('show version')
        pattern = re.compile(r'Rel.NetIAG.[0-9].[0-9].[0-9].[0-9][0-9]')
        curr_release = re.findall(pattern, resp)[0]
        curr_release = curr_release.replace('.', '_')
        print(f'***************** 您当前AG的版本是 {curr_release} ***********************')
        return curr_release

    def get_latest_release(self):
        """
        获取最新release
        """
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        driver = webdriver.Chrome(options=chrome_options)
        driver.get('http://192.168.120.204/builds/netIAG/')
        driver.maximize_window()
        # 获取最新版本号
        latest_release = driver.find_element_by_xpath('//tbody/tr[last()-1]/td[2]').text
        print(f'***************** 204 server上最新的netiag版本是 {latest_release} ***********************')
        return latest_release

    def auto_update_netiag(self):
        """
        自动升级netiag脚本
        """
        # 获取204server上最新版本号
        server_latest_release = self.get_latest_release()
        curr_release = self.get_current_release()
        current_release_name = 'nodebug-' + curr_release + '.click'
        # 如果最新版本不和获取的版本一致,则升级
        if server_latest_release not in (current_release_name):
            print(current_release_name)
            self.cli_cmd(f'system update "http://192.168.120.204/builds/netIAG/{server_latest_release}"')
            self.cli_cmd('YES')
            print(f'**************************** 正在升级 {server_latest_release} 版本的netiag,请耐心等候 *****************************')
            time.sleep(1200)
        print(f'**************** 当前版本 {server_latest_release} 已经是最新版本 **********************')


# 定时任务:每天晚上七点准时升级版本
test = CLI()
test.ssh_ag(hostname='192.168.120.7')
schedule.every().day.at("19:00").do(test.auto_update_netiag)
while True:
    # 启动任务
    schedule.run_pending()
    time.sleep(1)

# if __name__ == '__main__':
#     """
#     自动获取test密码
#     """
#     # ag_ip = input('请输入您AG管理IP:')
#     # test = CLI()
#     # test.ssh_ag(hostname=ag_ip)
#     # sn_number = test.get_sn()
#     # passwd = test.create_test_password(sn_number)
#     # print(f'*********************** 您的test账户密码是 {passwd} ************************\n')



image-1657883013429

10

评论区