Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 125 additions & 104 deletions projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get_temp_view_names(connection: RocpdImportData) -> List[str]:
return [
v[0]
for v in execute_statement(
connection, "SELECT name FROM sqlite_temp_master WHERE type='view';"
connection, "SELECT name FROM sqlite_temp_master WHERE type='view'"
).fetchall()
]

Expand All @@ -81,30 +81,34 @@ def get_temp_view_columns(connection: RocpdImportData, view_name: str) -> List[s
return [row[1] for row in cursor.fetchall()]


def make_temp_view_query(view_name, query) -> str:
return "CREATE TEMPORARY VIEW IF NOT EXISTS `{}` AS {}".format(view_name, query)


def export_view(
connection: RocpdImportData, view_name, output_format, output_path, filename=""
def export_query(
connection: RocpdImportData,
output_path,
output_file,
output_format,
query_name,
query,
) -> None:
"""Write the contents of a SQL view to an output format."""
"""Write the contents of a SQL query to an output format."""

query = "SELECT * FROM `{}`".format(view_name)
query_one = "SELECT * FROM `{}` LIMIT 1".format(view_name)
query_not_empty = f"""
SELECT EXISTS (
{query}
)
"""

# just return if view is empty
if not connection.execute(query_one).fetchone():
# just return if the result is empty
if not connection.execute(query_not_empty).fetchone()[0]:
return

# prepare the output filename
if not filename:
output_filename = view_name
if not output_file:
output_filename = query_name
else:
output_filename = f"{filename}_{view_name}"
output_filename = f"{output_file}_{query_name}"

if output_format == "console":
print(f"\n{view_name.upper()}:")
print(f"\n{query_name.upper()}:")

# call query module to export. query will append the extension
export_path = os.path.join(output_path, output_filename)
Expand All @@ -115,10 +119,11 @@ def export_view(

def generate_summary_query(
view_name: str,
view_query="",
name_column="name",
by_rank=False,
) -> Tuple[str, str]:
"""Generate the SQL statement to create a summary view."""
"""Generate the SQL statement to create a summary query."""

if by_rank:
view_suffix = "_summary_by_rank"
Expand Down Expand Up @@ -148,8 +153,19 @@ def generate_summary_query(

full_view_name = f"{view_name}{view_suffix}"

view_select = (
f"""
{view_name} AS (
{view_query}
),
"""
if view_query
else ""
)

summary_query = f"""
WITH
{view_select}
avg_data AS (
SELECT
{group_by_columns.replace(name_column, f"{name_column} AS name")},
Expand Down Expand Up @@ -192,14 +208,16 @@ def generate_summary_query(
aggregated_data AD
{total_duration_join}
ORDER BY
{"AD.pid," if by_rank else ""} AD.total_duration DESC;
{"AD.pid," if by_rank else ""} AD.total_duration DESC
"""

return (full_view_name, summary_query)


def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[str, str]:
"""Generate the SQL statement for domain summary by doing union over all summary views."""
def generate_domain_query(
connection: RocpdImportData, summary_queries, by_rank=False
) -> Tuple[str, str]:
"""Generate the SQL statement for domain summary by doing union over all summary queries."""

if by_rank:
view_suffix = "_summary_by_rank"
Expand All @@ -218,20 +236,27 @@ def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[s
join_condition = "CROSS JOIN total_duration TD"
order_by = 'ORDER BY GD."DURATION (nsec)" DESC'

summary_views = [
itr for itr in get_temp_view_names(connection) if itr.endswith(view_suffix)
]
summary_dictionary = {
query_name: query
for query_name, query in summary_queries.items()
if query_name.endswith(view_suffix)
}

if len(summary_dictionary) < 1:
return ()

if len(summary_views) < 1:
return view_name
summary_selects = [
f"{query_name} AS ({query}) ," for query_name, query in summary_dictionary.items()
]

union_selects = [
f" SELECT '{s.replace(view_suffix, '').upper()}' as domain, * FROM {s} "
for s in summary_views
f" SELECT '{query_name.replace(view_suffix, '').upper()}' as domain, * FROM {query_name} "
for query_name, query in summary_dictionary.items()
]

domain_select = f"""
WITH
{f"".join(summary_selects)}
all_domains AS (
{f" UNION ALL ".join(union_selects)}
),
Expand Down Expand Up @@ -268,14 +293,14 @@ def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[s
FROM
grouped_domains GD
{join_condition}
{order_by};
{order_by}
"""

return (view_name, domain_select)


def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
"""Create summary views for eligible temporary views in the database."""
def create_summary_queries(connection: RocpdImportData, by_rank=False):
"""Create summary queries for eligible temporary views in the database."""

NAME_COLUMN_MAP = {
"memory_allocations": "type",
Expand All @@ -287,6 +312,8 @@ def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:

views = get_temp_view_names(connection)

queries = {}

for view_name in views:
if any(pattern in view_name for pattern in avoid_view_pattern):
continue
Expand All @@ -295,30 +322,33 @@ def create_summary_views(connection: RocpdImportData, by_rank=False) -> None:
if not required_columns.issubset(columns):
continue

# Create regular summary view
summary_view_name, summary_query = generate_summary_query(
view_name, name_column=NAME_COLUMN_MAP.get(view_name, "name")
# Create regular summary query
summary_query_name, summary_query = generate_summary_query(
view_name, "", name_column=NAME_COLUMN_MAP.get(view_name, "name")
)
connection.execute(make_temp_view_query(summary_view_name, summary_query))
queries[summary_query_name] = summary_query

# Create per-rank summary
# Create per-rank summary query
if by_rank:
per_rank_view_name, summary_by_rank_query = generate_summary_query(
per_rank_query_name, summary_by_rank_query = generate_summary_query(
view_name,
"",
name_column=NAME_COLUMN_MAP.get(view_name, "name"),
by_rank=True,
)
connection.execute(
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
)
queries[per_rank_query_name] = summary_by_rank_query

return queries

def create_summary_region_views(
connection: RocpdImportData, by_rank=False, region_categories=None
) -> None:
"""Create summary and region views"""

query = "SELECT DISTINCT(category) FROM regions_and_samples;"
def create_summary_region_queries(
connection: RocpdImportData,
by_rank=False,
region_categories=None,
):
"""Create summary and region queries"""

query = "SELECT DISTINCT(category) FROM regions_and_samples"
categories = execute_statement(connection, query).fetchall()

if region_categories is None:
Expand All @@ -331,77 +361,73 @@ def create_summary_region_views(
if "MARKER" not in cat.upper()
}

queries = {}

for k, v in category_map.items():
if len(v) > 0:
conditions = [f"category LIKE '{c}'" for c in v]
temp_region_view = f"""
CREATE TEMPORARY VIEW IF NOT EXISTS `{k}` AS
region_query = f"""
SELECT *
FROM regions_and_samples
WHERE {" OR ".join(conditions)};
WHERE {" OR ".join(conditions)}
"""

connection.execute(temp_region_view)

# Create regular summary view
summary_view_name, summary_query = generate_summary_query(k)
connection.execute(make_temp_view_query(summary_view_name, summary_query))
# Create regular summary query
summary_query_name, summary_query = generate_summary_query(k, region_query)
queries[summary_query_name] = summary_query

# Create per-rank summary view
# Create per-rank summary query
if by_rank:
per_rank_view_name, summary_by_rank_query = generate_summary_query(
k, by_rank=True
)
connection.execute(
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
per_rank_query_name, summary_by_rank_query = generate_summary_query(
k, region_query, by_rank=True
)
queries[per_rank_query_name] = summary_by_rank_query

# Markers
if "MARKER" not in region_categories:
return
return queries

view_name = "markers"
markers_create = f"""
CREATE TEMPORARY VIEW IF NOT EXISTS `{view_name}` AS
markers_query_name = "markers"
markers_query = """
SELECT JSON_EXTRACT(extdata, '$.message') AS marker_name, *
FROM regions_and_samples
WHERE category LIKE 'MARKER_%'
"""
connection.execute(markers_create)

# Create regular summary view
summary_view_name, summary_query = generate_summary_query(
view_name, name_column="marker_name"
# Create regular summary query
summary_query_name, summary_query = generate_summary_query(
markers_query_name, markers_query, name_column="marker_name"
)
connection.execute(make_temp_view_query(summary_view_name, summary_query))
queries[summary_query_name] = summary_query

# Create per-rank summary view
# Create per-rank summary query
if by_rank:
per_rank_view_name, summary_by_rank_query = generate_summary_query(
view_name, name_column="marker_name", by_rank=True
)
connection.execute(
make_temp_view_query(per_rank_view_name, summary_by_rank_query)
per_rank_query_name, summary_by_rank_query = generate_summary_query(
markers_query_name, markers_query, name_column="marker_name", by_rank=True
)
queries[per_rank_query_name] = summary_by_rank_query

return queries


def create_domain_view(connection: RocpdImportData, by_rank=False) -> str:
"""Create a domain summary view by aggregating all summary views."""
def create_domain_query(connection: RocpdImportData, summary_queries, by_rank=False):
"""Create a domain summary query by aggregating all summary queries."""

view_name, domain_query = generate_domain_query(connection, by_rank=by_rank)
result = generate_domain_query(connection, summary_queries, by_rank=by_rank)
if not result:
return {}

# Create the domain summary view
connection.execute(make_temp_view_query(view_name, domain_query))
query_name, query = result

return view_name
return {query_name: query}


def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
"""Generate all summary views and write them to CSV files."""
"""Generate all summaries and export them to selected format."""

domain_summary = kwargs.get("domain_summary", False)
by_rank = kwargs.get("summary_by_rank", False)
filename = kwargs.get("output_file", "")
output_file = kwargs.get("output_file", "")
output_path = kwargs.get("output_path", "./rocpd-output-data")
region_categories = kwargs.get("region_categories", None)
output_format = kwargs.get("format", "console")
Expand All @@ -417,34 +443,29 @@ def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None:
),
)

# create the temporary summary views
create_summary_views(connection, by_rank)
create_summary_region_views(connection, by_rank, region_categories=region_categories)
summary_queries = {}

# Create the summary queries
summary_queries.update(create_summary_queries(connection, by_rank))
summary_queries.update(
create_summary_region_queries(
connection, by_rank, region_categories=region_categories
)
)

if domain_summary:
create_domain_view(connection)
summary_queries.update(create_domain_query(connection, summary_queries))
# Create domain summary per rank only if both domain_summary and summary_by_rank are enabled
if by_rank:
create_domain_view(connection, by_rank=True)

# Write regular summary views
print("\nSummary files:")
summary_views = [
itr for itr in get_temp_view_names(connection) if itr.endswith("_summary")
]
for v in summary_views:
export_view(connection, v, output_format, output_path, filename)
summary_queries.update(
create_domain_query(connection, summary_queries, by_rank=True)
)

# Write per-rank summary views if flag is set
if by_rank:
print("\nSummary files by rank:")
summary_by_rank_views = [
itr
for itr in get_temp_view_names(connection)
if itr.endswith("_summary_by_rank")
]
for v in summary_by_rank_views:
export_view(connection, v, output_format, output_path, filename)
# Export all summary queries
for query_name, query in summary_queries.items():
export_query(
connection, output_path, output_file, output_format, query_name, query
)


#
Expand Down
Loading