diff --git a/qualysapi/api_actions.py b/qualysapi/api_actions.py
index 48d1d33..7f4cd3f 100644
--- a/qualysapi/api_actions.py
+++ b/qualysapi/api_actions.py
@@ -2,6 +2,7 @@
import time
from urllib import parse as urlparse
import re
+import ipaddress
# from lxml import objectify
import xml.etree.ElementTree as ET
from qualysapi.api_objects import *
@@ -10,6 +11,26 @@
child_tags_list = None
logger = logging.getLogger(__name__)
class QGActions:
+ def ruleValidator(self, rule_type: str, rule_body: str):
+ #TODO: validator for the harder rule types
+ match rule_type:
+ case 'NETWORK_RANGE':
+ #IP range validation
+ addresses = rule_body.split('-')
+ valid_addresses = 0
+ for address in addresses:
+ try:
+ ip = ipaddress.IPv4Address(address)
+ valid_addresses += 1
+ except ValueError:
+ return False
+ return valid_addresses > 0
+ case 'GLOBAL_ASSET_VIEW':
+ #idk honestly, but somehow i can i guess???
+ return True #testing lol
+ print(rule_body)
+
+
def getHost(self, host_name=None, host_id=None, verbose=False):
if verbose:
call = 'rest/2.0/get/am/asset'
@@ -480,42 +501,6 @@ def listScans(self, launched_after="", state="", target="", type="", user_login=
return scanArray
-# don't need this anymore I reckon
-# def listChildTags(self, tag_name=None, tag_id=None, filename=None):
-# if tag_id:
-# files = (
-# """
-#
-# """
-# + tag_id
-# + """
-#
-# """
-# )
-# elif filename:
-# files = open(filename, "rb").read()
-# elif tag_name:
-# files = (
-# """
-#
-# """
-# + tag_name
-# + """
-#
-# """
-# ).encode("ascii", "ignore")
-
-# call = "/qps/rest/2.0/search/am/tag"
-# parameters = files
-# response = objectify.fromstring(
-# self.request(call, parameters, api_version=2, http_method="post").encode("utf-8")
-# )
-# childs = list()
-# for child in response.getchildren()[3][0].Tag.children.list.getchildren():
-# childs.append(child.getchildren())
-
-# return childs
-
def launchScan(self, title, option_title, iscanner_name, asset_groups="", ip=""):
# TODO: Add ability to scan by tag.
call = "/api/2.0/fo/scan/"
@@ -618,10 +603,8 @@ def listAppliances(self):
return scanner_array
- def getTag(self, tag_name=None,tag_id=None):
+ def getTag(self, tag_name: str | None = None,tag_id: int | None = None):
#TODO: fix recursion where multiple layers of child tags exist
- #TODO: enable searching by all types of search parameters through arguments passed
- #TODO: retrieve additional attributes such as rule for dynamic tags, criticality
call = "search/am/tag"
if (tag_name is not None) and (tag_id is not None):
logger.error('Error: unable to search, both tag name and id provided')
@@ -634,21 +617,24 @@ def getTag(self, tag_name=None,tag_id=None):
tree =tagData.find('data')
for item in tree.findall('Tag'):
item_data = {}
- item_data["id"] = item.find("id").text if item.find('id') is not None else None
- item_data["name"] = item.find("name").text if item.find('name') is not None else None
- item_data["created"] = item.find("created").text if item.find('created') is not None else None
- item_data["modified"] = item.find("modified").text if item.find('modified') is not None else None
- item_data["colour"] = item.find("color").text if item.find('color') is not None else None
- item_data['description'] = item.find('description').text if item.find('description') is not None else None
- item_data['has_children'] = item.find('children') if item.find('children') is not None else None
- if item_data['has_children'] is not None:
+ item_data["id"] = int(item.find("id").text) if item.find('id') is not None else None
+ item_data["name"] = str(item.find("name").text) if item.find('name') is not None else None
+ item_data["created"] = str(item.find("created").text) if item.find('created') is not None else None
+ item_data["modified"] = str(item.find("modified").text) if item.find('modified') is not None else None
+ item_data["colour"] = str(item.find("color").text) if item.find('color') is not None else None
+ item_data['description'] = str(item.find('description').text) if item.find('description') is not None else None
+ item_data['has_children'] = True if item.find('children') is not None else False
+ item_data['rule_type'] = str(item.find('ruleType').text) if item.find('ruleType') is not None else None
+ item_data['rule_value'] = str(item.find('ruleText').text) if item.find('ruleText') is not None else None
+ item_data['criticality'] = int(item.find('criticalityScore').text) if item.find('criticalityScore') is not None else None
+ if item_data['has_children']:
self.child_tags_list = []
for list in tree.iter('children'):
for items in list.iter('list'):
for tags in items.iter('TagSimple'):
- single_tag = tags.find('id').text if item.find('id') is not None else None
+ single_tag = int(tags.find('id').text) if item.find('id') is not None else None
if single_tag is not None:
- tag = self.getTag(id=single_tag)
+ tag = self.getTag(tag_id=single_tag)
self.child_tags_list.append(tag)
return Tag(
name=item_data["name"],
@@ -658,6 +644,9 @@ def getTag(self, tag_name=None,tag_id=None):
modified=item_data["modified"],
description=item_data['description'],
child_tags=self.child_tags_list,
+ criticality=item_data['criticality'],
+ rule_type=item_data['rule_type'],
+ dynamic_rule=item_data['rule_value'],
)
else:
return Tag(
@@ -667,36 +656,59 @@ def getTag(self, tag_name=None,tag_id=None):
created=item_data["created"],
modified=item_data["modified"],
description=item_data['description'],
+ criticality=item_data['criticality'],
+ rule_type=item_data['rule_type'],
+ dynamic_rule=item_data['rule_value'],
)
if items_found > 1:
- #TODO: return multiple tags for name-based search?
value = str(tag_id) if tag_id is not None else tag_name
- logger.warning(f'Warning: multiple results returned for tag: {value}')
+ logger.warning(f'Warning: multiple results returned for tag: {value}, use findTags() instead')
return None #for now...
if items_found < 1:
value = str(tag_id) if tag_id is not None else tag_name
logger.warning(f'Warning: unable to find tag: {value}')
return None
- def editTag(self, tag: Tag, new_name=None, new_colour=None):
- #TODO: allow passing attributes as dict of attributes
- #TODO: add additional editable attributes to function
+ def editTag(self, tag: Tag, name: str | None = None, colour: str | None = None, criticality: int | None = None,rule_type: str | None = None,rule_text: str | None = None,child_tags: list | None = None,child_tag_action: str | None = None,description: str | None = None):
call = f'update/am/tag/{str(tag.id)}'
- if new_colour is not None:
+ parameters = """"""
+ if colour is not None:
colour_validation = re.compile(r'#([A-Fa-f0-9]){6}')
- if not colour_validation.fullmatch(new_colour):
- logger.error(f'Error: colour is not valid hex code: {new_colour}')
+ if not colour_validation.fullmatch(colour):
+ logger.error(f'Error: colour is not valid hex code: {colour}')
return None
else:
- if new_name is not None:
- parameters = f"""{new_name}{new_colour}"""
- else:
- parameters = f"""{new_colour}"""
- elif (new_name is not None) and (new_colour is None):
- parameters = f"""{new_name}"""
- else:
- logger.error('Error: Colour and name both None')
- return None
+ parameters += f"""{colour}"""
+ if name is not None:
+ parameters +=f"""{name}"""
+ if criticality is not None and int(criticality) < 6 and int(criticality) > 0:
+ parameters +=f"""{int(criticality)}"""
+ if description is not None:
+ parameters += f"""{description}"""
+ if rule_type is not None and rule_text is not None:
+ if rule_type in ("STATIC","GROOVY","OS_REGEX","NETWORK_RANGE","NAME_CONTAINS","INSTALLED_SOFTWARE","OPEN_PORTS","VULN_EXIST","ASSET_SEARCH","CLOUD_ASSET","BUSINESS_INFORMATION","GLOBAL_ASSET_VIEW","NETWORK_RANGE","TAG_SET") and self.ruleValidator(rule_type,rule_text):
+ parameters += f"""{rule_type}{rule_text}"""
+ else:
+ logger.error(f'Error: Rule could not be validated: {rule_type} with value {rule_text}')
+ return None
+ if child_tag_action in ('set','remove') and child_tags is not None:
+ if child_tag_action == 'set':
+ parameters += f""""""
+ for item in child_tags:
+ if type(item) == str: #Creates a new tag (thanks API doco for not mentioning this)
+ parameters += f"""{item}"""
+ elif type(item) == Tag: #Adds an existing tag as a child
+ parameters += f"""{item.id}"""
+ parameters += f""""""
+ if child_tag_action == 'remove':
+ parameters += f""""""
+ for item in child_tags:
+ if type(item) == Tag:
+ parameters += f"""{item.id}"""
+ elif type(item) == int:
+ parameters += f"""{int(item)}"""
+ parameters += f""""""
+ parameters += """"""
tagData = ET.fromstring(self.request(api_call=call,http_method="POST",data=parameters,api_version="gav").encode("utf-8"))
for item in tagData.findall('responseCode'):
if item.text == 'SUCCESS':
@@ -704,15 +716,13 @@ def editTag(self, tag: Tag, new_name=None, new_colour=None):
tag_id = None
for item in tree.findall('Tag'):
tag_id = item.find('id').text if item.find('id') is not None else None
- return self.getTag(tag_id=tag_id)
+ return self.getTag(tag_id=tag_id) #required as returned tag on success does not contain all necessary attributes
else:
logger.error('Error: Tag failed to update')
return None
- def createTag(self, name: str, colour=None):
- #TODO: Validation of creation
- #TODO: ability to create tags with criticality, child tags, dynamic rules
- #TODO: allow passing attributes for tag as dict of attribs
+ def createTag(self, name: str, colour: str | None = None, criticality: int | None = None,rule_type: str | None = None,rule_text: str | None = None,child_tags: list | None = None,description: str | None = None):
+ #TODO: allow passing attributes for tag as dict of attribs?
call = 'create/am/tag'
colour_validation = re.compile(r'#([A-Fa-f0-9]){6}')
if colour is None:
@@ -720,25 +730,34 @@ def createTag(self, name: str, colour=None):
if not colour_validation.fullmatch(colour):
logger.error(f'Error: colour provided is not valid hex code: {colour}')
return None
- parameters = f"""{name}{colour}"""
+ parameters = f"""{name}{colour}"""
+ if criticality is not None and int(criticality) < 6 and int(criticality) > 0:
+ parameters +=f"""{int(criticality)}"""
+ if description is not None:
+ parameters += f"""{description}"""
+ if rule_type is not None and rule_text is not None:
+ if rule_type in ("STATIC","GROOVY","OS_REGEX","NETWORK_RANGE","NAME_CONTAINS","INSTALLED_SOFTWARE","OPEN_PORTS","VULN_EXIST","ASSET_SEARCH","CLOUD_ASSET","BUSINESS_INFORMATION","GLOBAL_ASSET_VIEW","NETWORK_RANGE","TAG_SET") and self.ruleValidator(rule_type,rule_text):
+ parameters += f"""{rule_type}{rule_text}"""
+ else:
+ logger.error(f'Error: Rule could not be validated: {rule_type} with value {rule_text}')
+ return None
+ if child_tags is not None:
+ parameters += f""""""
+ for item in child_tags:
+ if type(item) == Tag: #this adds an existing tag as a child
+ parameters += f"""{item.id}"""
+ elif type(item) == str: #this creates a tag with {name}, even if such a tag exists
+ parameters += f"""{item}"""
+ parameters += f""""""
+ parameters += f""""""
+
tagData = ET.fromstring(self.request(api_call=call,http_method="POST",data=parameters,api_version="gav").encode("utf-8"))
for item in tagData.findall('responseCode'):
if item.text == 'SUCCESS':
tree =tagData.find('data')
for item in tree.findall('Tag'):
- item_data = {}
- item_data["id"] = item.find("id").text if item.find('id') is not None else None
- item_data["name"] = item.find("name").text if item.find('name') is not None else None
- item_data["created"] = item.find("created").text if item.find('created') is not None else None
- item_data["modified"] = item.find("modified").text if item.find('modified') is not None else None
- item_data["colour"] = item.find("color").text if item.find('color') is not None else None
- return Tag(
- item_data["name"],
- item_data["id"],
- item_data["colour"],
- item_data["created"],
- item_data["modified"],
- )
+ id = int(item.find("id").text) if item.find('id') is not None else None
+ return self.getTag(tag_id=id)
else:
logger.error(f'Error: unable to create tag: {name}')
return None
@@ -758,4 +777,158 @@ def deleteTag(self, tag: Tag):
logger.error(f'Error: deletion failed for tag {tag.name}')
return False
else:
- return False
\ No newline at end of file
+ return False
+
+ def findTags(self, criteria, operator, search_value):
+ call = "search/am/tag"
+ parameters= f""""""
+ match criteria:
+ case 'id':
+ #I'm not really sure why you'd want to use this one like this but I'll build it anyway
+ logger.info(f'Criteria: {criteria}')
+ if operator in ('EQUALS','NOT EQUALS','GREATER','LESSER') and type(search_value) == int:
+ parameters += f"""{search_value}"""
+ elif type(search_value) == int:
+ logger.error(f'Error: invalid operator: {operator} for criteria: {criteria}')
+ return None
+ else:
+ logger.error(f'Error: invalid query \"{criteria} {operator} {search_value}\"')
+ case 'name':
+ #check operators
+ logger.info(f'Criteria: {criteria}')
+ if operator in ('EQUALS','NOT EQUALS','CONTAINS'):
+ parameters += f"""{search_value}"""
+ else:
+ logger.error(f'Error: invalid query \"{criteria} {operator} {search_value}\"')
+ case 'parent':
+ #check for id
+ logger.info(f'Criteria: {criteria}')
+ if type(search_value) == int:
+ parameters += f"""{search_value}"""
+ else:
+ logger.error(f'Error: invalid query \"{criteria} {operator} {search_value}\"')
+ case 'ruleType':
+ #check operators
+ logger.info(f'Criteria: {criteria}')
+ if criteria in ("STATIC","GROOVY","OS_REGEX","NETWORK_RANGE","NAME_CONTAINS","INSTALLED_SOFTWARE","OPEN_PORTS","VULN_EXIST","ASSET_SEARCH","CLOUD_ASSET","BUSINESS_INFORMATION","GLOBAL_ASSET_VIEW","NETWORK_RANGE","TAG_SET") and operator in ('EQUALS','NOT EQUALS'):
+ parameters += f"""{search_value}"""
+ else:
+ logger.error(f'Error: invalid query \"{criteria} {operator} {search_value}\"')
+ case 'provider':
+ #check against list of providers
+ logger.info(f'Criteria: {criteria}')
+ if criteria in ('EC2', 'AZURE', 'GCP', 'IBM', 'OCI', 'Alibaba') and operator in ('EQUALS','NOT EQUALS'):
+ parameters += f"""{search_value}"""
+ else:
+ logger.error(f'Error: invalid query \"{criteria} {operator} {search_value}\"')
+ case 'color':
+ logger.info(f'Criteria: {criteria}')
+ colour_validation = re.compile(r'#([A-Fa-f0-9]){6}')
+ if colour_validation.fullmatch(search_value) and operator in ('EQUALS','NOT EQUALS'):
+ parameters += f"""{search_value}"""
+ else:
+ logger.error(f'Error: invalid query \"{criteria} {operator} {search_value}\"')
+ parameters += f""""""
+
+ tagData = ET.fromstring(self.request(api_call=call,http_method="POST",data=parameters,api_version="gav").encode("utf-8"))
+ items_found = int(tagData.find('count').text)
+ if items_found == 1:
+ tree =tagData.find('data')
+ for item in tree.findall('Tag'):
+ item_data = {}
+ item_data["id"] = int(item.find("id").text) if item.find('id') is not None else None
+ item_data["name"] = str(item.find("name").text) if item.find('name') is not None else None
+ item_data["created"] = str(item.find("created").text) if item.find('created') is not None else None
+ item_data["modified"] = str(item.find("modified").text) if item.find('modified') is not None else None
+ item_data["colour"] = str(item.find("color").text) if item.find('color') is not None else None
+ item_data['description'] = str(item.find('description').text) if item.find('description') is not None else None
+ item_data['has_children'] = True if item.find('children') is not None else False
+ item_data['rule_type'] = str(item.find('ruleType').text) if item.find('ruleType') is not None else None
+ item_data['rule_value'] = str(item.find('ruleText').text) if item.find('ruleText') is not None else None
+ item_data['criticality'] = int(item.find('criticalityScore').text) if item.find('criticalityScore') is not None else None
+ if item_data['has_children']:
+ self.child_tags_list = []
+ for list in tree.iter('children'):
+ for items in list.iter('list'):
+ for tags in items.iter('TagSimple'):
+ single_tag = int(tags.find('id').text) if item.find('id') is not None else None
+ if single_tag is not None:
+ tag = self.getTag(tag_id=single_tag)
+ self.child_tags_list.append(tag)
+ return Tag(
+ name=item_data["name"],
+ id=item_data["id"],
+ colour=item_data["colour"],
+ created=item_data["created"],
+ modified=item_data["modified"],
+ description=item_data['description'],
+ child_tags=self.child_tags_list,
+ criticality=item_data['criticality'],
+ rule_type=item_data['rule_type'],
+ dynamic_rule=item_data['rule_value'],
+ )
+ else:
+ return Tag(
+ name=item_data["name"],
+ id=item_data["id"],
+ colour=item_data["colour"],
+ created=item_data["created"],
+ modified=item_data["modified"],
+ description=item_data['description'],
+ criticality=item_data['criticality'],
+ rule_type=item_data['rule_type'],
+ dynamic_rule=item_data['rule_value'],
+ )
+ if items_found > 1:
+ tags_list = []
+ tree =tagData.find('data')
+ for item in tree.findall('Tag'):
+ item_data = {}
+ item_data["id"] = int(item.find("id").text) if item.find('id') is not None else None
+ item_data["name"] = str(item.find("name").text) if item.find('name') is not None else None
+ item_data["created"] = str(item.find("created").text) if item.find('created') is not None else None
+ item_data["modified"] = str(item.find("modified").text) if item.find('modified') is not None else None
+ item_data["colour"] = str(item.find("color").text) if item.find('color') is not None else None
+ item_data['description'] = str(item.find('description').text) if item.find('description') is not None else None
+ item_data['has_children'] = True if item.find('children') is not None else False
+ item_data['rule_type'] = str(item.find('ruleType').text) if item.find('ruleType') is not None else None
+ item_data['rule_value'] = str(item.find('ruleText').text) if item.find('ruleText') is not None else None
+ item_data['criticality'] = int(item.find('criticalityScore').text) if item.find('criticalityScore') is not None else None
+ if item_data['has_children']:
+ self.child_tags_list = []
+ for list in tree.iter('children'):
+ for items in list.iter('list'):
+ for tags in items.iter('TagSimple'):
+ single_tag = int(tags.find('id').text) if item.find('id') is not None else None
+ if single_tag is not None:
+ tag = self.getTag(tag_id=single_tag)
+ self.child_tags_list.append(tag)
+ tags_list.append(Tag(
+ name=item_data["name"],
+ id=item_data["id"],
+ colour=item_data["colour"],
+ created=item_data["created"],
+ modified=item_data["modified"],
+ description=item_data['description'],
+ child_tags=self.child_tags_list,
+ criticality=item_data['criticality'],
+ rule_type=item_data['rule_type'],
+ dynamic_rule=item_data['rule_value'],
+ ))
+ else:
+ tags_list.append(Tag(
+ name=item_data["name"],
+ id=item_data["id"],
+ colour=item_data["colour"],
+ created=item_data["created"],
+ modified=item_data["modified"],
+ description=item_data['description'],
+ criticality=item_data['criticality'],
+ rule_type=item_data['rule_type'],
+ dynamic_rule=item_data['rule_value'],
+ ))
+ return tags_list
+
+ if items_found < 1:
+ logger.warning(f'Warning: unable to find tags matching: {criteria} {operator} {search_value}')
+ return None
\ No newline at end of file
diff --git a/qualysapi/api_objects.py b/qualysapi/api_objects.py
index bd74826..f8c8f83 100644
--- a/qualysapi/api_objects.py
+++ b/qualysapi/api_objects.py
@@ -1,8 +1,5 @@
import datetime
-
-# from lxml import objectify
-
-
+import zoneinfo as TZ
class Host:
def __init__(self, dns, id, host_type,created,modified,ip=None,last_scan=None,tags=None):
self.dns = str(dns)
@@ -210,7 +207,7 @@ def __init__(self, id: int, uuid: str, name: str, network_id: int, software_vers
self.status = status
class Tag:
- def __init__(self, name: str, id: int, colour: str, created:str, modified:str,child_tags=None,description=None,criticality=None,dynamic=None,dynamic_rule=None):
+ def __init__(self, name: str, id: int, colour: str, created:str, modified:str,child_tags: list | None = None,description: str | None = None,criticality: int | None = None,rule_type: str | None = None,dynamic_rule: str | None = None):
self.name = str(name)
self.id = int(id)
self.colour = str(colour)
@@ -219,17 +216,17 @@ def __init__(self, name: str, id: int, colour: str, created:str, modified:str,ch
date = process_created[0].split("-")
time = process_created[1].split(":")
self.created = datetime.datetime(
- int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2])
+ int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2]), tzinfo=TZ.ZoneInfo("UTC") #Qualys defaults to UTC it seems
)
process_modified = str(modified).replace("T", " ").replace("Z", "").split(" ")
date = process_modified[0].split("-")
time = process_modified[1].split(":")
self.modified = datetime.datetime(
- int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2])
+ int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2]), tzinfo=TZ.ZoneInfo("UTC") #Qualys defaults to UTC it seems
)
self.description = description
self.child_tags = child_tags
- self.dynamic = dynamic
+ self.rule_type = rule_type
self.dynamic_rule = dynamic_rule
self.criticality = criticality
def __repr__(self):
diff --git a/qualysapi/connector.py b/qualysapi/connector.py
index 18ad9a2..2ab470d 100644
--- a/qualysapi/connector.py
+++ b/qualysapi/connector.py
@@ -415,7 +415,7 @@ def request(
# self.rate_limit_remaining[api_call],
# )
response = request.text
- print(response)
+ # print(response)
# if (
# "1960" in response
# and "This API cannot be run again until" in response