使用Python调用Nessus 接口实现自动化扫描

Johnny21 等级 794 0 0

之前在项目中需要接入nessus扫描器,研究了一下nessus的api,现在将自己的成果分享出来。
Nessus提供了丰富的二次开发接口,无论是接入其他系统还是自己实现自动化扫描,都十分方便。
同时Nessus也提供了完备的API文档,可以在 Settings->My Account->API Keys->API documentation

使用Python调用Nessus 接口实现自动化扫描

nessus图片

认证

nessus提供两种认证方式,第一种采用常规的登录后获取token的方式,在https://localhost:8834/api#/resources/session条目中可以找到这种方式,它的接口定义如下:

POST /session

{
    "username":{string},
    "password":{string}
} 

输入正确的用户名和密码,登录成功后会返回一个token
···
{ "token": {string}
} ···

在后续请求中,将token放入请求头信息中请求头的key为X-Cookie,值为 token=xxxx,例如 :X-Cookie: token=5fa3d3fd97edcf40a41bb4dbdfd0b470ba45dde04ebc37f8;,下面是获取任务列表的例子

import requests
import json
def get_token(ip, port, username, password):
    url = "https://{0}:{1}/session".format(ip, port)
    post_data = {
        'username': username,
        'password': password
    }

    respon = requests.post(url, data=post_data, verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["token"]

def get_scan_list()
    # 这里ip和port可以从配置文件中读取或者从数据库读取,这里我省略了获取这些配置值得操作
    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            "X-Cookie":"token={0}".format(token),
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result 

第二种方式是使用Nessus生成的API Key,这里我们可以依次点击 Settings->My Account->API Keys-->Generate按钮,生成一个key,后续使用时填入头信息中,还是以获取扫描任务列表作为例子

def get_scan_list()
    accessKey = "XXXXXX" #此处填入真实的内容
    secretKey = "XXXXXX" #此处填入真实内容

    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accessKey, secretkey=secretKey)
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result 

对比来看使用第二种明显方便一些,因此后续例子都采用第二种方式来呈现

策略模板配置

策略模板的接口文档在 https://localhost:8834/api#/resources/policies 中。

创建策略模板

创建策略模板使用的是 策略模板的create接口,它里面有一个必须填写的参数 uuid 这个参数是一个uuid值,表示以哪种现有模板进行创建。在创建之前需要先获取系统中可用的模板。获取的接口是 /editor/{type}/templates,type 可以选填policy或者scan。这里我们填policy

一般我们都是使用模板中的 Advanced 来创建,如下图

使用Python调用Nessus 接口实现自动化扫描

在这里插入图片描述

下面是获取该模板uuid的方法,主要思路是获取系统中所有模板,然后根据模板名称返回对应的uuid值

def get_nessus_template_uuid(ip, port, template_name = "advanced"):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/editor/scan/templates".format(ip=ip, port=port)
    response = requests.get(api, headers=header, verify=False)
    templates = json.loads(response.text)['templates']

    for template in templates:
        if template['name'] == template_name:
            return template['uuid']
    return None 

有了这个id之后,下面来创建策略模板,这个接口的参数较多,但是很多参数都是选填项。

这个部分文档写的很简陋,很多参数不知道是干嘛用的,当时我为了搞清楚每个参数的作用,一个个的尝试,然后去界面上看它的效果,最后终于把我感兴趣的给弄明白了。
它的主体部分如下:

{
   "uuid": {template_uuid},
   "audits": {
       "feed": {
           "add": [
               {
                   "id": {audit_id},
                   "variables": {
                       "1": {audit_variable_value},
                       "2": {audit_variable_value},
                       "3": {audit_variable_value}
                   }
               }
           ]
       }
   },
   "credentials": {
       "add": {
           {credential_category}: {
               {credential_name}: [
                   {
                       {credential_input_name}: {string}
                   }
               ]
           }
       }
   },
   "plugins": {
       {plugin_family_name}: {
           "status": {string},
           "individual": {
               {plugin_id}: {string}
           }
       }
   },
   "scap": {
       "add": {
           {scap_category}: [
               {
                   {scap_input_name}: {string}
               }
           ]
       }
   },
   "settings": {
       "acls": [
           permission Resource
       ],
       //其他的减值对,这里我将他们都省略了
} 

他们与界面上配置的几个大项有对应关系,能对应的上的我给做了标记,但是有的部分对应不上。

使用Python调用Nessus 接口实现自动化扫描

在这里插入图片描述

settings 是给策略模板做基础配置的,包括配置扫描的端口范围,服务检测范围等等。
credentials 是配置登录扫描的,主要包括 windows、ssh、telnet等等
plugins 配置扫描使用的插件,例如服务扫描版本漏洞等等

在settings中,对应关系如下图所示

使用Python调用Nessus 接口实现自动化扫描

host scan

使用Python调用Nessus 接口实现自动化扫描

port scan

使用Python调用Nessus 接口实现自动化扫描

在这里插入图片描述

下面是创建扫描策略模板的实际例子:

def create_template(ip, port, **kwargs): # kwargs 作为可选参数,用来配置settings和其他项
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }
    policys = {}

    # 这里 grouppolicy_set 存储的是策略模板中各个脚本名称以及脚本是否启用的信息
    for policy in grouppolicy_set:
        enabled = "enabled" if policy.enable else "disabled"
        policys[policy.name] = {
            "status": enabled
        }

    # settings里面的各小项必须得带上,否则会创建不成功
    "settings": {
        "name": template.name,
        "watchguard_offline_configs": "",
        "unixfileanalysis_disable_xdev": "no",
        "unixfileanalysis_include_paths": "",
        "unixfileanalysis_exclude_paths": "",
        "unixfileanalysis_file_extensions": "",
        "unixfileanalysis_max_size": "",
        "unixfileanalysis_max_cumulative_size": "",
        "unixfileanalysis_max_depth": "",
        "unix_docker_scan_scope": "host",
        "sonicos_offline_configs": "",
        "netapp_offline_configs": "",
        "junos_offline_configs": "",
        "huawei_offline_configs": "",
        "procurve_offline_configs": "",
        "procurve_config_to_audit": "Saved/(show config)",
        "fortios_offline_configs": "",
        "fireeye_offline_configs": "",
        "extremeos_offline_configs": "",
        "dell_f10_offline_configs": "",
        "cisco_offline_configs": "",
        "cisco_config_to_audit": "Saved/(show config)",
        "checkpoint_gaia_offline_configs": "",
        "brocade_offline_configs": "",
        "bluecoat_proxysg_offline_configs": "",
        "arista_offline_configs": "",
        "alcatel_timos_offline_configs": "",
        "adtran_aos_offline_configs": "",
        "patch_audit_over_telnet": "no",
        "patch_audit_over_rsh": "no",
        "patch_audit_over_rexec": "no",
        "snmp_port": "161",
        "additional_snmp_port1": "161",
        "additional_snmp_port2": "161",
        "additional_snmp_port3": "161",
        "http_login_method": "POST",
        "http_reauth_delay": "",
        "http_login_max_redir": "0",
        "http_login_invert_auth_regex": "no",
        "http_login_auth_regex_on_headers": "no",
        "http_login_auth_regex_nocase": "no",
        "never_send_win_creds_in_the_clear": "yes" if kwargs["never_send_win_creds_in_the_clear"] else "no",
        "dont_use_ntlmv1": "yes" if kwargs["dont_use_ntlmv1"] else "no",
        "start_remote_registry": "yes" if kwargs["start_remote_registry"] else "no",
        "enable_admin_shares": "yes" if kwargs["enable_admin_shares"] else "no",
        "ssh_known_hosts": "",
        "ssh_port": kwargs["ssh_port"],
        "ssh_client_banner": "OpenSSH_5.0",
        "attempt_least_privilege": "no",
        "region_dfw_pref_name": "yes",
        "region_ord_pref_name": "yes",
        "region_iad_pref_name": "yes",
        "region_lon_pref_name": "yes",
        "region_syd_pref_name": "yes",
        "region_hkg_pref_name": "yes",
        "microsoft_azure_subscriptions_ids": "",
        "aws_ui_region_type": "Rest of the World",
        "aws_us_east_1": "",
        "aws_us_east_2": "",
        "aws_us_west_1": "",
        "aws_us_west_2": "",
        "aws_ca_central_1": "",
        "aws_eu_west_1": "",
        "aws_eu_west_2": "",
        "aws_eu_west_3": "",
        "aws_eu_central_1": "",
        "aws_eu_north_1": "",
        "aws_ap_east_1": "",
        "aws_ap_northeast_1": "",
        "aws_ap_northeast_2": "",
        "aws_ap_northeast_3": "",
        "aws_ap_southeast_1": "",
        "aws_ap_southeast_2": "",
        "aws_ap_south_1": "",
        "aws_me_south_1": "",
        "aws_sa_east_1": "",
        "aws_use_https": "yes",
        "aws_verify_ssl": "yes",
        "log_whole_attack": "no",
        "enable_plugin_debugging": "no",
        "audit_trail": "use_scanner_default",
        "include_kb": "use_scanner_default",
        "enable_plugin_list": "no",
        "custom_find_filepath_exclusions": "",
        "custom_find_filesystem_exclusions": "",
        "reduce_connections_on_congestion": "no",
        "network_receive_timeout": "5",
        "max_checks_per_host": "5",
        "max_hosts_per_scan": "100",
        "max_simult_tcp_sessions_per_host": "",
        "max_simult_tcp_sessions_per_scan": "",
        "safe_checks": "yes",
        "stop_scan_on_disconnect": "no",
        "slice_network_addresses": "no",
        "allow_post_scan_editing": "yes",
        "reverse_lookup": "no",
        "log_live_hosts": "no",
        "display_unreachable_hosts": "no",
        "report_verbosity": "Normal",
        "report_superseded_patches": "yes",
        "silent_dependencies": "yes",
        "scan_malware": "no",
        "samr_enumeration": "yes",
        "adsi_query": "yes",
        "wmi_query": "yes",
        "rid_brute_forcing": "no",
        "request_windows_domain_info": "no",
        "scan_webapps": "no",
        "start_cotp_tsap": "8",
        "stop_cotp_tsap": "8",
        "modbus_start_reg": "0",
        "modbus_end_reg": "16",
        "hydra_always_enable": "yes" if kwargs["hydra_always_enable"] else "no",
        "hydra_logins_file": "" if kwargs["hydra_logins_file"] else kwargs["hydra_logins_file"], # 弱口令文件需要事先上传,后面会提到上传文件接口
        "hydra_passwords_file": "" if kwargs["hydra_passwords_file"] else kwargs["hydra_passwords_file"],
        "hydra_parallel_tasks": "16",
        "hydra_timeout": "30",
        "hydra_empty_passwords": "yes",
        "hydra_login_as_pw": "yes",
        "hydra_exit_on_success": "no",
        "hydra_add_other_accounts": "yes",
        "hydra_postgresql_db_name": "",
        "hydra_client_id": "",
        "hydra_win_account_type": "Local accounts",
        "hydra_win_pw_as_hash": "no",
        "hydra_cisco_logon_pw": "",
        "hydra_web_page": "",
        "hydra_proxy_test_site": "",
        "hydra_ldap_dn": "",
        "test_default_oracle_accounts": "no",
        "provided_creds_only": "yes",
        "smtp_domain": "example.com",
        "smtp_from": "nobody@example.com",
        "smtp_to": "postmaster@[AUTO_REPLACED_IP]",
        "av_grace_period": "0",
        "report_paranoia": "Normal",
        "thorough_tests": "no",
        "detect_ssl": "yes",
        "tcp_scanner": "no",
        "tcp_firewall_detection": "Automatic (normal)",
        "syn_scanner": "yes",
        "syn_firewall_detection": "Automatic (normal)",
        "wol_mac_addresses": "",
        "wol_wait_time": "5",
        "scan_network_printers": "no",
        "scan_netware_hosts": "no",
        "scan_ot_devices": "no",
        "ping_the_remote_host": "yes",
        "tcp_ping": "yes",
        "icmp_unreach_means_host_down": "no",
        "test_local_nessus_host": "yes",
        "fast_network_discovery": "no",

        "arp_ping": "yes" if kwargs["arp_ping"] else "no",
        "tcp_ping_dest_ports": kwargs["tcp_ping_dest_ports"],
        "icmp_ping": "yes" if kwargs["icmp_ping"] else "no",
        "icmp_ping_retries": kwargs["icmp_ping_retries"],
        "udp_ping": "yes" if kwargs["udp_ping"] else "no",
        "unscanned_closed": "yes" if kwargs["unscanned_closed"] else "no",
        "portscan_range": kwargs["portscan_range"],
        "ssh_netstat_scanner": "yes" if kwargs["ssh_netstat_scanner"] else "no",
        "wmi_netstat_scanner": "yes" if kwargs["wmi_netstat_scanner"] else "no",
        "snmp_scanner": "yes" if kwargs["snmp_scanner"] else "no",
        "only_portscan_if_enum_failed": "yes" if kwargs["only_portscan_if_enum_failed"] else "no",
        "verify_open_ports": "yes" if kwargs["verify_open_ports"] else "no",
        "udp_scanner": "yes" if kwargs["udp_scanner"] else "no",
        "svc_detection_on_all_ports": "yes" if kwargs["svc_detection_on_all_ports"] else "no",
        "ssl_prob_ports": "Known SSL ports" if kwargs["ssl_prob_ports"] else "All ports",
        "cert_expiry_warning_days": kwargs["cert_expiry_warning_days"],
        "enumerate_all_ciphers": "yes" if kwargs["enumerate_all_ciphers"] else "no",
        "check_crl": "yes" if kwargs["check_crl"] else "no",
   }

    credentials = {
            "add": {
                "Host": {
                    "SSH": [],
                    "SNMPv3": [],
                    "Windows": [],
                },
                "Plaintext Authentication": {
                    "telnet/rsh/rexec": []
                }
            }
        }
        try:
            if kwargs["snmpv3_username"] and kwargs["snmpv3_port"] and kwargs["snmpv3_level"]:
                level = kwargs["snmpv3_level"]
                if level == NessusSettings.LOW:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "No authentication and no privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"]
                    })
                elif level == NessusSettings.MID:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication without privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"][1]],
                        "auth_password": kwargs["snmpv3_auth_psd"]
                    })
                elif level == NessusSettings.HIGH:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication and privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"]][1],
                        "auth_password": kwargs["snmpv3_auth_psd"],
                        "privacy_algorithm": NessusSettings.PPIVACY_ALG[kwargs["snmpv3_hide"]][1],
                        "privacy_password": kwargs["snmpv3_hide_psd"]
                    })

            if kwargs["ssh_username"] and kwargs["ssh_psd"]:
                credentials["add"]["Host"]["SSH"].append(
                    {
                        "auth_method": "password",
                        "username": kwargs["ssh_username"],
                        "password": kwargs["ssh_psd"],
                        "elevate_privileges_with": "Nothing",
                        "custom_password_prompt": "",
                    })

            if kwargs["windows_username"] and kwargs["windows_psd"]:
                credentials["add"]["Host"]["Windows"].append({
                    "auth_method": "Password",
                    "username": kwargs["windows_username"],
                    "password": kwargs["windows_psd"],
                    "domain": kwargs["ssh_host"]
                })

            if kwargs["telnet_username"] and kwargs["telnet_password"]:
                credentials["add"]["Plaintext Authentication"]["telnet/rsh/rexec"].append({
                    "username": kwargs["telnet_username"],
                    "password": kwargs["telnet_password"]
                })

    data = {
            "uuid": get_nessus_template_uuid(terminal, "advanced"),
            "settings": settings,
            "plugins": policys,
            "credentials": credentials
        }

    api = "https://{0}:{1}/policies".format(ip, port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"), # 这里做一个转码防止在nessus端发生中文乱码
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["policy_id"] # 返回策略模板的id,后续可以在创建任务时使用
    else:
        return None 

策略还有copy、delete、config等操作,这里就不再介绍了,这个部分主要弄清楚各参数的作用,后面的这些接口使用的参数都是一样的

任务

任务部分的API 在https://localhost:8834/api#/resources/scans

创建任务

创建任务重要的参数如下说明如下:

  1. uuid: 创建任务时使用的模板id,这个id同样是我们上面说的系统自带的模板id
  2. name:任务名称
  3. policy_id:策略模板ID,这个是可选的,如果要使用上面我们自己定义的扫描模板,需要使用这个参数来指定,并且设置上面的uuid为 custom 的uuid,这个值表示使用用户自定义模板;当然如果就想使用系统提供的,这个字段可以不填
  4. text_targets:扫描目标地址,这个参数是一个数组,可以填入多个目标地址,用来一次扫描多个主机

创建任务的例子如下:

def create_task(task_name, policy_id, hosts): # host 是一个列表,存放的是需要扫描的多台主机
    uuid = get_nessus_template_uuid(terminal, "custom") # 获取自定义策略的uuid
    if uuid is None:
        return False

    data = {"uuid": uuid, "settings": {
        "name": name,
        "policy_id": policy_id,
        "enabled": True,
        "text_targets": hosts,
        "agent_group_id": []
    }}

    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/scans".format(ip=terminal.ip, port=terminal.port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"),
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        if data["scan"] is not None:
            scan = data["scan"]
            # 新增任务扩展信息记录

            return scan["id"] # 返回任务id 

启动/停止任务

启动任务的接口为 POST /scans/{scan_id}/launch scan_id 是上面创建任务返回的任务ID, 它有个可选参数 alt_targets,如果这个参数被指定,那么该任务可以扫描这个参数中指定的主机,而之前创建任务时指定的主机将被替代

停止任务的接口为: POST /scans/{scan_id}/stop
下面给出启动和停止任务的方法

def start_task(task_id, hosts):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    data = {
        "alt_targets": [hosts] # 重新指定扫描地址
    }

    api = "https://{ip}:{port}/scans/{scan_id}/launch".format(ip=ip, port=port, scan_id=scan_id)
    response = requests.post(api, data=data, verify=False, headers=header)
    if response.status_code != 200:
        return False
    else:
        return True

def stop_task(task_id):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=terminal.reserved1,
                                                                          secretkey=terminal.reserved2),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/scans/{scan_id}/stop".format(ip=ip, port=port, task_id)
    response = requests.post(api, headers=header, verify=False)
    if response.status_code == 200 or response.status_code == 409: # 根据nessus api文档可以知道409 表示任务已结束
        return True

    return False 

获取扫描结果

使用接口 GET /scans/{scan_id} 可以获取最近一次扫描的任务信息,从接口文档上看,它还可以获取某次历史扫描记录的信息,如果不填这个参数,接口中会返回所有历史记录的id。如果不填历史记录id,那么会返回最近一次扫描到的漏洞信息,也就是说新扫描到的信息会把之前的信息给覆盖

下面是返回信息的部分说明

{
    "info": {
        "edit_allowed": {boolean},
        "status": {string}, //当前状态 completed 字符串表示结束,cancel表示停止
        "policy": {string},
        "pci-can-upload": {boolean},
        "hasaudittrail": {boolean},
        "scan_start": {string},
        "folder_id": {integer},
        "targets": {string},
        "timestamp": {integer},
        "object_id": {integer},
        "scanner_name": {string},
        "haskb": {boolean},
        "uuid": {string},
        "hostcount": {integer},
        "scan_end": {string},
        "name": {string},
        "user_permissions": {integer},
        "control": {boolean}
    },
    "hosts": [ //按主机区分的漏洞信息
        host Resource
    ],
    "comphosts": [
        host Resource
    ],
    "notes": [
        note Resource
    ],
    "remediations": {
        "remediations": [
            remediation Resource
        ],
        "num_hosts": {integer},
        "num_cves": {integer},
        "num_impacted_hosts": {integer},
        "num_remediated_cves": {integer}
    },
    "vulnerabilities": [
        vulnerability Resource //本次任务扫描到的漏洞信息
    ],
    "compliance": [
        vulnerability Resource
    ],
    "history": [
        history Resource //历史扫描信息,可以从这个信息中获取历史记录的id
    ],
    "filters": [
        filter Resource
    ]
} 

这个信息里面vulnerabilities和host里面都可以拿到漏洞信息,但是 vulnerabilities中是扫描到的所有漏洞信息,而host则需要根据id再次提交请求,也就是需要额外一次请求,但它是按照主机对扫描到的漏洞进行了分类。而使用vulnerabilities则需要根据漏洞信息中的host_id 手工进行分类

下面是获取任务状态的示例:

def get_task_status(task_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{task_id}".format(ip=ip, port=port,
                                                       task_id=task_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    hosts = data["hosts"]
    for host in hosts:
        get_host_vulnerabilities(scan_id, host["host_id"]) # 按主机获取漏洞信息

    if data["info"]["status"] == "completed" or data["info"]["status"] =='canceled':
        # 已完成,此时更新本地任务状态
    return 1, "OK" 

获取漏洞信息

在获取任务信息中,已经得到了本次扫描中发现的弱点信息了,只需要我们解析这个json。它具体的内容如下:

"host_id": {integer}, //主机id
"host_index": {string}, 
"hostname": {integer},//主机名称
"progress": {string}, //扫描进度
"critical": {integer}, //危急漏洞数
"high": {integer}, //高危漏洞数
"medium": {integer}, //中危漏洞数
"low": {integer}, //低危漏洞数
"info": {integer}, //相关信息数目
"totalchecksconsidered": {integer},
"numchecksconsidered": {integer},
"scanprogresstotal": {integer},
"scanprogresscurrent": {integer},
"score": {integer} 

根据主机ID可以使用 GET /scans/{scan_id}/hosts/{host_id} 接口获取主机信息,它需要两个参数,一个是扫描任务id,另一个是主机id。
下面列举出来的是返回值得部分内容,只列举了我们感兴趣的部分:

{
    "info": {
        "host_start": {string},
        "mac-address": {string},
        "host-fqdn": {string},
        "host_end": {string},
        "operating-system": {string},
        "host-ip": {string}
    },

    "vulnerabilities": [
        {
            "host_id": {integer}, //主机id
            "hostname": {string}, //主机名称
            "plugin_id": {integer}, //策略id
            "plugin_name": {string}, //策略名称
            "plugin_family": {string}, //所属策略组
            "count": {integer}, //该种漏洞数
            "vuln_index": {integer}, 
            "severity_index": {integer},
            "severity": {integer}
        }
    ],
} 

根据上面获取任务信息中得到的主机id和任务id,我们可以实现这个功能

def get_host_vulnerabilities(scan_id, host_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    scan_history = ScanHistory.objects.get(id=scan_id)
    api = "https://{ip}:{port}/scans/{task_id}/hosts/{host_id}".format(ip=ip, port=port, task_id=scan_id, host_id=host_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    vulns = data["vulnerabilities"]
    for vuln in vulns:
        vuln_name = vuln["plugin_name"]
        plugin_id = vuln["plugin_id"] #插件id,可以获取更详细信息,包括插件自身信息和扫描到漏洞的解决方案等信息
        #保存漏洞信息 

获取漏洞输出信息与漏洞知识库信息

我们在nessus web页面中可以看到每条被检测到的漏洞在展示时会有输出信息和知识库信息,这些信息也可以根据接口来获取

使用Python调用Nessus 接口实现自动化扫描

在这里插入图片描述

获取漏洞的知识库可以通过接口 GET /scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id} , 它的路径为: https://localhost:8834/api#/resources/scans/plugin-output

它返回的值如下:

{
    "info": {
        "plugindescription": {
            "severity": {integer}, //危险等级,从info到最后的critical依次为1,2,3,4,5
            "pluginname": {string}, 
            "pluginattributes": {
                "risk_information": {
                    "risk_factor": {string}
                },
                "plugin_name": {string}, //插件名称
                "plugin_information": {
                    "plugin_id": {integer},
                    "plugin_type": {string},
                    "plugin_family": {string},
                    "plugin_modification_date": {string}
                },
                "solution": {string}, //漏洞解决方案
                "fname": {string},
                "synopsis": {string},
                "description": {string} //漏洞描述
            },
            "pluginfamily": {string},
            "pluginid": {integer}
        }
    },
    "output": [
        plugin_output:{
           "plugin_output": {string}, //输出信息
           "hosts": {string}, //主机信息
           "severity": {integer}, 
           "ports": {} //端口信息
        }
    ]
} 

有了这些信息,我们可以通过下面的代码获取这些信息:

def get_vuln_detail(scan_id, host_id, plugin_id)
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}".format(ip=ip, port=port, scan_id=scan_id, host_id=host_id, plugin_id=plugin_id)
    response = requests.get(api, headers=header, verify=False)
    data = json.loads(response.text)
    outputs = data["outputs"]
    return outputs 

最后总结

这篇文章我们主要介绍了nessus API从扫描设置到扫描任务创建、启动、停止、以及结果的获取的内容,当然nessus的api不止这些,但是最重要的应该是这些,如果能帮助各位解决手头上的问题自然是极好的,如果不能或者说各位朋友需要更细致的控制,可以使用浏览器抓包的方式来分析它的请求和响应包。
在摸索时最好的两个帮手是浏览器 F12工具栏中的 network和nessus api文档页面上的test工具了。

我们可以先按 f12 打开工具并切换到network,然后在页面上执行相关操作,观察发包即可发现该如何使用这些API,因为Nessus Web端在操作时也是使用API。如下图:

使用Python调用Nessus 接口实现自动化扫描

在这里插入图片描述

或者可以使用文档中的test工具,例如下面是测试 获取插件输出信息的接口

使用Python调用Nessus 接口实现自动化扫描

在这里插入图片描述

本文转自 https://www.jianshu.com/p/a4236809e11a,如有侵权,请联系删除。

收藏
评论区

相关推荐

python中的异常处理
异常 异常就是程序运行时发生错误的信号,在python中,错误触发的异常如下 异常种类   在python中不同的异常可以用不同的类型(python
15. Python 程序运行速度如何提高十倍?第一遍滚雪球学 Python 收工
本篇文章将给大家介绍 Python 多线程与多进程相关知识,学习完该知识点之后,你的 Python 程序将进入另一个高峰。 <center<font colorred缓解一下视疲劳</font</center 15. Python 程序运行速度如何提高十倍?第一遍滚雪球学 Python 收工(https://imghelloworld.oss
kali2020.3安装Nessus8.12.1并解除IP限制
1、准备 (1)获得激活码 地址:https://zhcn.tenable.com/products/nessus/nessusessentials https://zhcn.tenable.com/products/nessus/activationcode?tns\_redirecttrue(https://zhcn.tenable
运维监控系统——Zabbix简介
前言对于运维人员来说,监控是非常重要的,因为如果想要保证线上业务整体能够稳定运行,那么我们则需要实时关注与其相关的各项指标是否正常,而一个业务系统的背后,往往存在着很多的服务器、网络设备等硬件资源,如果我们想要能够更加方便的、集中的监
基于Python的nessus API简析
前言最近在开发一个基于Nessus的自动化漏扫工具,来和大家分析一下关于Nessus API的使用心得。 Nessus提供了非常完善的API,可以帮助我们实现很多事情,无论是对接其他运维系统,还是用来编写自动化的漏扫工具都十分方便。 Nessus为这些api提供了详细的文档,你可以在SettingsMy AccountAPI KeysAP
运维安全-信息安全
本文转自 ,如有侵权,请联系删除。
使用Python调用Nessus 接口实现自动化扫描
之前在项目中需要接入nessus扫描器,研究了一下nessus的api,现在将自己的成果分享出来。 Nessus提供了丰富的二次开发接口,无论是接入其他系统还是自己实现自动化扫描,都十分方便。 同时Nessus也提供了完备的API文档,可以在 SettingsMy AccountAPI KeysAPI documentation nessu
Nessus漏扫
Nessus号称是世界上最流行的漏洞扫描程序,全世界有超过75000个组织在使用它。该工具提供完整的电脑漏洞扫描服务,并随时更新其漏洞数据库。Nessus不同于传统的漏洞扫描软件,Nessus可同时在本机或远端上遥控,进行系统的漏洞分析扫描。Nessus也是渗透测试重要工具之一。所以,本章将介绍安装、配置并启动Nessus。 5.1.1 安装和配置Ne
数据库运维做些什么?
一. 数据库生命周期 结合软件生命周期、项目的开展,数据库的生命周期大致可分为这么几个阶段。 (https://imghelloworld.osscnbeijing.aliyuncs.com/8552b8c2942bb8ce23
Linux使用 常见经验和技巧总结
1.XShell连接远程Linux服务器并运行程序后关闭XShell继续执行XShell是用来连接远程Linux很好的工具,在连接之后并运行好需要运行的程序后,往往需要关闭XShell,但是直接运行比如运行python crawler.py运行一个Python爬虫程序后,并直接关闭XShell往往会同时杀掉正在运行的爬虫程序,因此需要使用sudo
python文件的第一行 #!/usr/bin/python3 是什么意思?
python文件的第一行代码通常在脚本语言的第一行会看到: !/usr/bin/env python或 !/usr/bin/python 首先要确定的一点是它不是注释。这两句话的目的都是指出你的python文件用什么可执行程序去运行它。1. !/usr/bin/python 是告诉操作系统执行这个脚本的时候,调用 /usr/bin 下的 python 解释
怎么学python,学习python的正确姿势
Python是一门相对来说比较简单的编程语言,自学是非常轻松的。首先得明白python有哪些发展方向 需要了解这个这门语言 而不是听说 这个高薪资 容易学习最好的学习状态就是出于兴趣 兴趣是最好的老师 当然对钱感兴趣也是可以的。一、人工智能二、大数据三、网络爬虫工程师四、Python web全栈工程师五、Python自动化运维六、Python自动化测试再来说
一篇文章带你了解Python运算符重载
您可以根据所使用的操作数来更改Python中运算符的含义。这种做法称为运算符重载,今天我们一起来聊聊运算符重载。 一、什么是Python中的运算符重载? Python运算符用于内置类。但是相同的运算符对不同的类型有不同的行为。例如,+运算符将对两个数字执行算术加法、合并两个列表并连接两个字符串。Python中的这一功能允许同一运算符根据上下文具有不同的含
一篇文章带你了解Python运算符重载
您可以根据所使用的操作数来更改Python中运算符的含义。这种做法称为运算符重载,今天我们一起来聊聊运算符重载。 一、什么是Python中的运算符重载? Python运算符用于内置类。但是相同的运算符对不同的类型有不同的行为。例如,+运算符将对两个数字执行算术加法、合并两个列表并连接两个字符串。Python中的这一功能允许同一运算符根据上下文具有不同的含
我们也从 Python 转向了 Golang -- MemFireDB
首先说明一下,Python 也是我最喜欢的一门编程语言,我用 Python 工作了接近 8 年,并且会一直使用下去。我们团队在开启 这个项目之初就做出了从 Python 往 golang 转换的预期,因此我们的转换过程没有任何障碍,非常顺利的就完成了。我们为什么会在项目开启之初就做出要更换编程语言的决定呢,为什么不一开始就选择 Golang 呢?第一个问题