diff --git a/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py b/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py index e109b954c34..6f4c4dfb9b3 100644 --- a/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py +++ b/projects/rocprofiler-sdk/source/lib/python/rocpd/summary.py @@ -69,7 +69,7 @@ def get_temp_view_names(connection: RocpdImportData) -> List[str]: return [ v[0] for v in execute_statement( - connection, "SELECT name FROM sqlite_temp_master WHERE type='view';" + connection, "SELECT name FROM sqlite_temp_master WHERE type='view'" ).fetchall() ] @@ -81,30 +81,34 @@ def get_temp_view_columns(connection: RocpdImportData, view_name: str) -> List[s return [row[1] for row in cursor.fetchall()] -def make_temp_view_query(view_name, query) -> str: - return "CREATE TEMPORARY VIEW IF NOT EXISTS `{}` AS {}".format(view_name, query) - - -def export_view( - connection: RocpdImportData, view_name, output_format, output_path, filename="" +def export_query( + connection: RocpdImportData, + output_path, + output_file, + output_format, + query_name, + query, ) -> None: - """Write the contents of a SQL view to an output format.""" + """Write the contents of a SQL query to an output format.""" - query = "SELECT * FROM `{}`".format(view_name) - query_one = "SELECT * FROM `{}` LIMIT 1".format(view_name) + query_not_empty = f""" + SELECT EXISTS ( + {query} + ) + """ - # just return if view is empty - if not connection.execute(query_one).fetchone(): + # just return if the result is empty + if not connection.execute(query_not_empty).fetchone()[0]: return # prepare the output filename - if not filename: - output_filename = view_name + if not output_file: + output_filename = query_name else: - output_filename = f"{filename}_{view_name}" + output_filename = f"{output_file}_{query_name}" if output_format == "console": - print(f"\n{view_name.upper()}:") + print(f"\n{query_name.upper()}:") # call query module to export. query will append the extension export_path = os.path.join(output_path, output_filename) @@ -115,10 +119,11 @@ def export_view( def generate_summary_query( view_name: str, + view_query="", name_column="name", by_rank=False, ) -> Tuple[str, str]: - """Generate the SQL statement to create a summary view.""" + """Generate the SQL statement to create a summary query.""" if by_rank: view_suffix = "_summary_by_rank" @@ -148,8 +153,19 @@ def generate_summary_query( full_view_name = f"{view_name}{view_suffix}" + view_select = ( + f""" + {view_name} AS ( + {view_query} + ), + """ + if view_query + else "" + ) + summary_query = f""" WITH + {view_select} avg_data AS ( SELECT {group_by_columns.replace(name_column, f"{name_column} AS name")}, @@ -192,14 +208,16 @@ def generate_summary_query( aggregated_data AD {total_duration_join} ORDER BY - {"AD.pid," if by_rank else ""} AD.total_duration DESC; + {"AD.pid," if by_rank else ""} AD.total_duration DESC """ return (full_view_name, summary_query) -def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[str, str]: - """Generate the SQL statement for domain summary by doing union over all summary views.""" +def generate_domain_query( + connection: RocpdImportData, summary_queries, by_rank=False +) -> Tuple[str, str]: + """Generate the SQL statement for domain summary by doing union over all summary queries.""" if by_rank: view_suffix = "_summary_by_rank" @@ -218,20 +236,27 @@ def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[s join_condition = "CROSS JOIN total_duration TD" order_by = 'ORDER BY GD."DURATION (nsec)" DESC' - summary_views = [ - itr for itr in get_temp_view_names(connection) if itr.endswith(view_suffix) - ] + summary_dictionary = { + query_name: query + for query_name, query in summary_queries.items() + if query_name.endswith(view_suffix) + } + + if len(summary_dictionary) < 1: + return () - if len(summary_views) < 1: - return view_name + summary_selects = [ + f"{query_name} AS ({query}) ," for query_name, query in summary_dictionary.items() + ] union_selects = [ - f" SELECT '{s.replace(view_suffix, '').upper()}' as domain, * FROM {s} " - for s in summary_views + f" SELECT '{query_name.replace(view_suffix, '').upper()}' as domain, * FROM {query_name} " + for query_name, query in summary_dictionary.items() ] domain_select = f""" WITH + {f"".join(summary_selects)} all_domains AS ( {f" UNION ALL ".join(union_selects)} ), @@ -268,14 +293,14 @@ def generate_domain_query(connection: RocpdImportData, by_rank=False) -> Tuple[s FROM grouped_domains GD {join_condition} - {order_by}; + {order_by} """ return (view_name, domain_select) -def create_summary_views(connection: RocpdImportData, by_rank=False) -> None: - """Create summary views for eligible temporary views in the database.""" +def create_summary_queries(connection: RocpdImportData, by_rank=False): + """Create summary queries for eligible temporary views in the database.""" NAME_COLUMN_MAP = { "memory_allocations": "type", @@ -287,6 +312,8 @@ def create_summary_views(connection: RocpdImportData, by_rank=False) -> None: views = get_temp_view_names(connection) + queries = {} + for view_name in views: if any(pattern in view_name for pattern in avoid_view_pattern): continue @@ -295,30 +322,33 @@ def create_summary_views(connection: RocpdImportData, by_rank=False) -> None: if not required_columns.issubset(columns): continue - # Create regular summary view - summary_view_name, summary_query = generate_summary_query( - view_name, name_column=NAME_COLUMN_MAP.get(view_name, "name") + # Create regular summary query + summary_query_name, summary_query = generate_summary_query( + view_name, "", name_column=NAME_COLUMN_MAP.get(view_name, "name") ) - connection.execute(make_temp_view_query(summary_view_name, summary_query)) + queries[summary_query_name] = summary_query - # Create per-rank summary + # Create per-rank summary query if by_rank: - per_rank_view_name, summary_by_rank_query = generate_summary_query( + per_rank_query_name, summary_by_rank_query = generate_summary_query( view_name, + "", name_column=NAME_COLUMN_MAP.get(view_name, "name"), by_rank=True, ) - connection.execute( - make_temp_view_query(per_rank_view_name, summary_by_rank_query) - ) + queries[per_rank_query_name] = summary_by_rank_query + return queries -def create_summary_region_views( - connection: RocpdImportData, by_rank=False, region_categories=None -) -> None: - """Create summary and region views""" - query = "SELECT DISTINCT(category) FROM regions_and_samples;" +def create_summary_region_queries( + connection: RocpdImportData, + by_rank=False, + region_categories=None, +): + """Create summary and region queries""" + + query = "SELECT DISTINCT(category) FROM regions_and_samples" categories = execute_statement(connection, query).fetchall() if region_categories is None: @@ -331,77 +361,73 @@ def create_summary_region_views( if "MARKER" not in cat.upper() } + queries = {} + for k, v in category_map.items(): if len(v) > 0: conditions = [f"category LIKE '{c}'" for c in v] - temp_region_view = f""" - CREATE TEMPORARY VIEW IF NOT EXISTS `{k}` AS + region_query = f""" SELECT * FROM regions_and_samples - WHERE {" OR ".join(conditions)}; + WHERE {" OR ".join(conditions)} """ - connection.execute(temp_region_view) - - # Create regular summary view - summary_view_name, summary_query = generate_summary_query(k) - connection.execute(make_temp_view_query(summary_view_name, summary_query)) + # Create regular summary query + summary_query_name, summary_query = generate_summary_query(k, region_query) + queries[summary_query_name] = summary_query - # Create per-rank summary view + # Create per-rank summary query if by_rank: - per_rank_view_name, summary_by_rank_query = generate_summary_query( - k, by_rank=True - ) - connection.execute( - make_temp_view_query(per_rank_view_name, summary_by_rank_query) + per_rank_query_name, summary_by_rank_query = generate_summary_query( + k, region_query, by_rank=True ) + queries[per_rank_query_name] = summary_by_rank_query # Markers if "MARKER" not in region_categories: - return + return queries - view_name = "markers" - markers_create = f""" - CREATE TEMPORARY VIEW IF NOT EXISTS `{view_name}` AS + markers_query_name = "markers" + markers_query = """ SELECT JSON_EXTRACT(extdata, '$.message') AS marker_name, * FROM regions_and_samples WHERE category LIKE 'MARKER_%' """ - connection.execute(markers_create) - # Create regular summary view - summary_view_name, summary_query = generate_summary_query( - view_name, name_column="marker_name" + # Create regular summary query + summary_query_name, summary_query = generate_summary_query( + markers_query_name, markers_query, name_column="marker_name" ) - connection.execute(make_temp_view_query(summary_view_name, summary_query)) + queries[summary_query_name] = summary_query - # Create per-rank summary view + # Create per-rank summary query if by_rank: - per_rank_view_name, summary_by_rank_query = generate_summary_query( - view_name, name_column="marker_name", by_rank=True - ) - connection.execute( - make_temp_view_query(per_rank_view_name, summary_by_rank_query) + per_rank_query_name, summary_by_rank_query = generate_summary_query( + markers_query_name, markers_query, name_column="marker_name", by_rank=True ) + queries[per_rank_query_name] = summary_by_rank_query + + return queries -def create_domain_view(connection: RocpdImportData, by_rank=False) -> str: - """Create a domain summary view by aggregating all summary views.""" +def create_domain_query(connection: RocpdImportData, summary_queries, by_rank=False): + """Create a domain summary query by aggregating all summary queries.""" - view_name, domain_query = generate_domain_query(connection, by_rank=by_rank) + result = generate_domain_query(connection, summary_queries, by_rank=by_rank) + if not result: + return {} - # Create the domain summary view - connection.execute(make_temp_view_query(view_name, domain_query)) + query_name, query = result - return view_name + return {query_name: query} def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None: - """Generate all summary views and write them to CSV files.""" + """Generate all summaries and export them to selected format.""" domain_summary = kwargs.get("domain_summary", False) by_rank = kwargs.get("summary_by_rank", False) - filename = kwargs.get("output_file", "") + output_file = kwargs.get("output_file", "") output_path = kwargs.get("output_path", "./rocpd-output-data") region_categories = kwargs.get("region_categories", None) output_format = kwargs.get("format", "console") @@ -417,34 +443,29 @@ def generate_all_summaries(connection: RocpdImportData, **kwargs: Any) -> None: ), ) - # create the temporary summary views - create_summary_views(connection, by_rank) - create_summary_region_views(connection, by_rank, region_categories=region_categories) + summary_queries = {} + + # Create the summary queries + summary_queries.update(create_summary_queries(connection, by_rank)) + summary_queries.update( + create_summary_region_queries( + connection, by_rank, region_categories=region_categories + ) + ) if domain_summary: - create_domain_view(connection) + summary_queries.update(create_domain_query(connection, summary_queries)) # Create domain summary per rank only if both domain_summary and summary_by_rank are enabled if by_rank: - create_domain_view(connection, by_rank=True) - - # Write regular summary views - print("\nSummary files:") - summary_views = [ - itr for itr in get_temp_view_names(connection) if itr.endswith("_summary") - ] - for v in summary_views: - export_view(connection, v, output_format, output_path, filename) + summary_queries.update( + create_domain_query(connection, summary_queries, by_rank=True) + ) - # Write per-rank summary views if flag is set - if by_rank: - print("\nSummary files by rank:") - summary_by_rank_views = [ - itr - for itr in get_temp_view_names(connection) - if itr.endswith("_summary_by_rank") - ] - for v in summary_by_rank_views: - export_view(connection, v, output_format, output_path, filename) + # Export all summary queries + for query_name, query in summary_queries.items(): + export_query( + connection, output_path, output_file, output_format, query_name, query + ) #