diff --git a/Pulumi.cape-cod-dev.yaml b/Pulumi.cape-cod-dev.yaml index d805c34..5a33bfc 100644 --- a/Pulumi.cape-cod-dev.yaml +++ b/Pulumi.cape-cod-dev.yaml @@ -759,15 +759,109 @@ config: logging_enabled: True env_vars: - "DAP_REG_DDB_TABLE" + - "WORKFLOW_REG_DDB_TABLE" - "DDB_REGION" - "USER_ATTRS_DDB_TABLE" - "ETL_ATTRS_DDB_TABLE" - "CRAWLER_ATTRS_DDB_TABLE" - "CANNED_REPORT_DDB_TABLE" + - "MWAA_ENVIRONMENT" # env_vars: TODO: add env vars for this API if needed. # TODO: memory and timeouts for these functions need # some love handlers: + - id: "get_workflow_dags_handler" + name: "getdags" + code: "assets/api/capi/handlers/get_workflow_dags.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: "getdags Lambda Function" + memory_size: 128 + timeout: 10 + - id: "get_workflow_pipeline_profiles_handler" + name: "getdagprofiles" + code: "assets/api/capi/handlers/get_workflow_pipeline_profiles.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: "getdagprofiles Lambda Function" + memory_size: 128 + timeout: 10 + - id: "post_workflow_run_handler" + name: "postdagrun" + code: "assets/api/capi/handlers/post_workflow_run.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: "postdagrun Lambda Function" + memory_size: 128 + timeout: 10 + - id: "patch_workflow_run_handler" + name: "patchdagrun" + code: "assets/api/capi/handlers/patch_workflow_run.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: "patchdagrun Lambda Function" + memory_size: 128 + timeout: 10 + - id: "get_workflow_run_handler" + name: "getdagrun" + code: "assets/api/capi/handlers/get_workflow_run.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: "getdags Lambda Function" + memory_size: 128 + timeout: 10 + - id: "get_workflow_tasks_handler" + name: "getdagtasks" + code: "assets/api/capi/handlers/get_workflow_tasks.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: "getdagtasks Lambda Function" + memory_size: 128 + timeout: 10 + - id: "get_workflow_run_task_instances_handler" + name: "getdagruntaskinsts" + code: "assets/api/capi/handlers/get_workflow_run_task_instances.py" + layers: + - capi-all + funct_args: + handler: "index.index_handler" + runtime: "python3.10" + architectures: + - "x86_64" + description: + "getdagruntaskinsts Lambda Function" + memory_size: 128 + timeout: 10 - id: "get_daps_handler" name: "getdaps" code: "assets/api/capi/handlers/get_daps.py" @@ -778,7 +872,7 @@ config: runtime: "python3.10" architectures: - "x86_64" - description: "getdaps Lambda Funnction" + description: "getdaps Lambda Function" memory_size: 128 timeout: 3 - id: "get_dap_profile_handler" @@ -791,7 +885,7 @@ config: runtime: "python3.10" architectures: - "x86_64" - description: "getdapprofiles Lambda Funnction" + description: "getdapprofiles Lambda Function" memory_size: 128 timeout: 3 - id: "get_dap_status_handler" @@ -804,7 +898,7 @@ config: runtime: "python3.10" architectures: - "x86_64" - description: "getdapstatus Lambda Funnction" + description: "getdapstatus Lambda Function" memory_size: 128 timeout: 3 - id: "get_dap_logs_handler" @@ -817,9 +911,11 @@ config: runtime: "python3.10" architectures: - "x86_64" - description: "getdaplogs Lambda Funnction" + description: "getdaplogs Lambda Function" memory_size: 128 timeout: 3 + # TODO: if not used as part of workflows (probably + # won't be) this should be removed - id: "submit_dap_run_handler" name: "submitdaprun" code: "assets/api/capi/handlers/submit_dap_run.py" @@ -830,7 +926,7 @@ config: runtime: "python3.10" architectures: - "x86_64" - description: "submitdaprun Lambda Funnction" + description: "submitdaprun Lambda Function" memory_size: 128 timeout: 3 - id: "get_raw_objstore_authz_handler" @@ -1273,9 +1369,10 @@ config: # and no compute environments will be deployed compute: # `environments` (mapping, optional) - # Environments for aws batch and mwaa (aairflow) are configured + # Environments for aws batch and mwaa (airflow) are configured # in this section. There are mappings for both types of - # environments, and both mappings contain lists of mappings. + # environments, with the airflow mapping being singular and the + # batch environments being a list of mappings. # # Both types have some shared config keys and then keys specific # to that type. Shared keys are as follows: @@ -1290,9 +1387,8 @@ config: # config file. At least one subnet must be defined here. # environments: - # Each mwaa mapping in the list has the - # following schema in addition to the shared keys mentioned - # above: + # The mwaa mapping has the following schema in addition to + # the shared keys mentioned above: # * `dag_path` (string, required) # The path in the CAPE meta assets s3 bucket where dags # will be stored for airflow to load. MWAA syncs these @@ -1332,39 +1428,39 @@ config: # here: # https://www.pulumi.com/registry/packages/aws/api-docs/mwaa/environment/ mwaa: - - name: airflow-env - dag_path: airflow/dags - airflow_version: 3.0.6 - airflow_config: - core.default_task_retries: 2 - core.parallelism: 20 - celery.worker_autoscale: 5,5 - environment_class: mw1.small - subnet_types: - - compute - ingress_subnet_types: - - compute - - vpn - extra_env_args: - min_workers: 2 - max_workers: 10 - logging_configuration: - # NOTE: INFO is the lowest level supported in MWAA - dag_processing_logs: - enabled: True - log_level: INFO - scheduler_logs: - enabled: True - log_level: INFO - task_logs: - enabled: True - log_level: INFO - webserver_logs: - enabled: True - log_level: INFO - worker_logs: - enabled: True - log_level: INFO + name: airflow-env + dag_path: airflow/dags + airflow_version: 3.0.6 + airflow_config: + core.default_task_retries: 2 + core.parallelism: 20 + celery.worker_autoscale: 5,5 + environment_class: mw1.small + subnet_types: + - compute + ingress_subnet_types: + - compute + - vpn + extra_env_args: + min_workers: 2 + max_workers: 10 + logging_configuration: + # NOTE: INFO is the lowest level supported in MWAA + dag_processing_logs: + enabled: True + log_level: INFO + scheduler_logs: + enabled: True + log_level: INFO + task_logs: + enabled: True + log_level: INFO + webserver_logs: + enabled: True + log_level: INFO + worker_logs: + enabled: True + log_level: INFO # Each batch mapping in the list has the # following schema in addition to the shared keys mentioned # above: diff --git a/assets/analysis-pipelines/bactopia/bactopia-base-3.2.0.json b/assets/analysis-pipelines/bactopia/bactopia-base-3.2.0.json index fee36cc..1bf3cdd 100644 --- a/assets/analysis-pipelines/bactopia/bactopia-base-3.2.0.json +++ b/assets/analysis-pipelines/bactopia/bactopia-base-3.2.0.json @@ -1,6 +1,7 @@ { "pipelineType": "nextflow", "pipelineName": "Bactopia", + "pipelineId": "bactopia-bactopia-base-v3.2.0", "pipelineDescription": "Execute Bactopia v3.2.0", "project": "bactopia/bactopia", "version": "v3.2.0", diff --git a/assets/analysis-pipelines/bactopia/bactopia-base-dev.json b/assets/analysis-pipelines/bactopia/bactopia-base-dev.json index db4ad42..3777c80 100644 --- a/assets/analysis-pipelines/bactopia/bactopia-base-dev.json +++ b/assets/analysis-pipelines/bactopia/bactopia-base-dev.json @@ -1,6 +1,7 @@ { "pipelineType": "nextflow", "pipelineName": "Bactopia", + "pipelineId": "bactopia-bactopia-base-dev", "pipelineDescription": "Execute Bactopia development release", "project": "bactopia/bactopia", "version": "dev", diff --git a/assets/analysis-pipelines/bactopia/kraken2-bactopia-3.2.0.json b/assets/analysis-pipelines/bactopia/kraken2-bactopia-3.2.0.json index 416d88f..23bf167 100644 --- a/assets/analysis-pipelines/bactopia/kraken2-bactopia-3.2.0.json +++ b/assets/analysis-pipelines/bactopia/kraken2-bactopia-3.2.0.json @@ -1,6 +1,7 @@ { "pipelineType": "nextflow", "pipelineName": "Bactopia Kraken2", + "pipelineId": "bactopia-kraken2-v3.2.0", "pipelineDescription": "Execute Bactopia's Kraken2 workflow with the development release", "project": "bactopia/bactopia", "version": "v3.2.0", diff --git a/assets/analysis-pipelines/bactopia/kraken2-bactopia-dev.json b/assets/analysis-pipelines/bactopia/kraken2-bactopia-dev.json index 5deba2c..10dd8c0 100644 --- a/assets/analysis-pipelines/bactopia/kraken2-bactopia-dev.json +++ b/assets/analysis-pipelines/bactopia/kraken2-bactopia-dev.json @@ -1,6 +1,7 @@ { "pipelineType": "nextflow", "pipelineName": "Bactopia Kraken2", + "pipelineId": "bactopia-kraken2-dev", "pipelineDescription": "Execute Bactopia's Kraken2 workflow with the development release", "project": "bactopia/bactopia", "version": "dev", diff --git a/assets/analysis-pipelines/bactopia/ont-bactopia-3.2.0.json b/assets/analysis-pipelines/bactopia/ont-bactopia-3.2.0.json index 90b8355..5806a08 100644 --- a/assets/analysis-pipelines/bactopia/ont-bactopia-3.2.0.json +++ b/assets/analysis-pipelines/bactopia/ont-bactopia-3.2.0.json @@ -1,6 +1,7 @@ { "pipelineType": "nextflow", "pipelineName": "Bactopia ONT Sample", + "pipelineId": "bactopia-ont-v3.2.0", "pipelineDescription": "Execute Bactopia's ONT sample sequencing workflow with v3.2.0", "project": "bactopia/bactopia", "version": "v3.2.0", diff --git a/assets/analysis-pipelines/bactopia/ont-bactopia-dev.json b/assets/analysis-pipelines/bactopia/ont-bactopia-dev.json index 9c4d90b..a57ad44 100644 --- a/assets/analysis-pipelines/bactopia/ont-bactopia-dev.json +++ b/assets/analysis-pipelines/bactopia/ont-bactopia-dev.json @@ -1,6 +1,7 @@ { "pipelineType": "nextflow", "pipelineName": "Bactopia ONT Sample", + "pipelineId": "bactopia-ont-dev", "pipelineDescription": "Execute Bactopia's ONT sample sequencing workflow with the development release", "project": "bactopia/bactopia", "version": "dev", diff --git a/assets/api/capi/capi-openapi-301.yaml.j2 b/assets/api/capi/capi-openapi-301.yaml.j2 index 14d702b..c004a7d 100644 --- a/assets/api/capi/capi-openapi-301.yaml.j2 +++ b/assets/api/capi/capi-openapi-301.yaml.j2 @@ -1277,9 +1277,745 @@ paths: passthroughBehavior: "when_no_match" timeoutInMillis: 29000 type: "mock" + /workflows: + get: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id for a workflow to limit the return to. If + not provided all workflows will be returned. + required: false + - in: query + name: includeDisabled + schema: + type: boolean + description: + Boolean value specifying if disabled workflows should + be included in results. Defaults to false if not + provided. + required: false + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: + An object describing the workflows. Format + described in the Airflow REST v2 API docs + for GET /api/v2/dags. + items: + type: object + properties: + dags: + type: array + items: + type: object + additionalProperties: true + total_entries: + type: integer + "400": + description: Airflow Bad Params (400) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "401": + description: Airflow Unauthorized (401) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "403": + description: Airflow Forbidden (403) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "404": + description: Airflow Not Found (404) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "422": + description: Unprocessable Content/Validation Error (422) + content: + application/json: + schema: + $ref: "#/components/responses/AirflowUnprocessableContentError" + "500": + description: + Server error while fetching workflow details. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['get_workflow_dags_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + options: + responses: + "200": + $ref: "#/components/responses/200OptionsCors" + x-amazon-apigateway-integration: + responses: + default: + statusCode: "200" + responseParameters: + method.response.header.Access-Control-Allow-Methods: "'OPTIONS,GET'" + method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" + method.response.header.Access-Control-Allow-Origin: "'*'" + requestTemplates: + application/json: "{'statusCode':200}" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "mock" + /workflows/pipelineprofiles: + get: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id for a workflow fetch pipeline profiles for. + required: true + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: + An object containing information describing + input required for each pipeline profile + used in the workflow. + properties: + type: object + properties: + dagId: + description: + The DAG ID for the workflow asked for + type: string + pipelineProfiles: + type: array + items: + type: object + additionalProperties: true + "404": + description: Workflow/pipeline Not Found (404) + content: + application/json: + schema: + # this is a CAPE 404, not one that came from + # airflow, so we define our own schema + description: + Could not find specified workflow or a + pipeline specified as part of the workflow + type: object + properties: + detail: + type: string + {# TODO: what other responses here? #} + "500": + description: + Server error while fetching workflow details. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['get_workflow_pipeline_profiles_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + options: + responses: + "200": + $ref: "#/components/responses/200OptionsCors" + x-amazon-apigateway-integration: + responses: + default: + statusCode: "200" + responseParameters: + method.response.header.Access-Control-Allow-Methods: "'OPTIONS,GET'" + method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" + method.response.header.Access-Control-Allow-Origin: "'*'" + requestTemplates: + application/json: "{'statusCode':200}" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "mock" + /workflows/trigger: + post: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id for a workflow to trigger. + required: true + requestBody: + description: Trigger an airflow DAG run. + required: true + content: + application/json: + schema: + type: object + # this format will be defined by the needs of the + # DAG and cannot be defined in a more specific + # manner here + additionalProperties: true + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: + An object describing the submitted DAG run. + Format described in the Airflow REST v2 API + docs for POST /api/v2/dags/{dag_id}/dagRuns + (200 response). + items: + type: object + # this is not our format to define, so + # we'll just say it's any object format + additionalProperties: true + "400": + description: Airflow Bad Params (400) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "401": + description: Airflow Unauthorized (401) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "403": + description: Airflow Forbidden (403) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "404": + description: Airflow Not Found (404) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "409": + description: Airflow Conflict (409) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "422": + description: Unprocessable Content/Validation Error (422) + content: + application/json: + schema: + $ref: "#/components/responses/AirflowUnprocessableContentError" + "500": + description: + Server error while triggering workflow run. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['post_workflow_run_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + /workflows/halt: + patch: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id for the workflow being halted/failed. + required: true + - in: query + name: dagRunId + schema: + type: string + description: + The dag run id for the workflow being halted/failed. + required: false + requestBody: + description: Halt an airflow DAG run. + required: false + content: + application/json: + schema: + type: object + properties: + note: + type: string + description: An optional note to add to the halted DAG run. + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: + An object describing the halted/failed DAG + run. Format described in the Airflow REST + v2 API docs for PATCH + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} + (200 response). + # this is not our format to define, so + # we'll just say it's any object format + additionalProperties: true + "400": + description: Airflow Bad Params (400) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "401": + description: Airflow Unauthorized (401) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "403": + description: Airflow Forbidden (403) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "404": + description: Airflow Not Found (404) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "422": + description: Unprocessable Content/Validation Error (422) + content: + application/json: + schema: + $ref: "#/components/responses/AirflowUnprocessableContentError" + "500": + description: + Server error while halting/failing workflow run. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['patch_workflow_run_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + options: + responses: + "200": + $ref: "#/components/responses/200OptionsCors" + x-amazon-apigateway-integration: + responses: + default: + statusCode: "200" + responseParameters: + method.response.header.Access-Control-Allow-Methods: "'OPTIONS,GET'" + method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" + method.response.header.Access-Control-Allow-Origin: "'*'" + requestTemplates: + application/json: "{'statusCode':200}" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "mock" + /workflows/run: + get: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id for a workflow run being fetched + required: true + - in: query + name: dagRunId + schema: + type: string + description: + The dag run id for a workflow run being fetched + required: true + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: + An object describing the workflow run. Format + described in the Airflow REST v2 API docs + for GET + /api/v2/dag/{dag_id}/dagRuns/{dag_run_id} + # this is not our format to define, so + # we'll just say it's any object format + additionalProperties: true + "400": + description: Airflow Bad Params (400) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "401": + description: Airflow Unauthorized (401) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "403": + description: Airflow Forbidden (403) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "404": + description: Airflow Not Found (404) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "422": + description: Unprocessable Content/Validation Error (422) + content: + application/json: + schema: + $ref: "#/components/responses/AirflowUnprocessableContentError" + "500": + description: + Server error while fetching workflow run details. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['get_workflow_run_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + options: + responses: + "200": + $ref: "#/components/responses/200OptionsCors" + x-amazon-apigateway-integration: + responses: + default: + statusCode: "200" + responseParameters: + method.response.header.Access-Control-Allow-Methods: "'OPTIONS,GET'" + method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" + method.response.header.Access-Control-Allow-Origin: "'*'" + requestTemplates: + application/json: "{'statusCode':200}" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "mock" + /workflows/tasks: + get: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id to get the tasks for + required: true + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: an object containing information on all tasks in the DAG + properties: + tasks: + type: array + description: An array of objects describing the tasks of the DAG + items: + type: object + description: + An object describing one task. Format + described in the Airflow REST v2 API docs + for GET + /api/v2/dags/{dag_id}/tasks + # this is not our format to define, so + # we'll just say it's any object format + additionalProperties: true + total_entries: + type: integer + description: the number of items in the tasks array + "400": + description: Airflow Bad Params (400) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "401": + description: Airflow Unauthorized (401) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "403": + description: Airflow Forbidden (403) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "404": + description: Airflow Not Found (404) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "422": + description: Unprocessable Content/Validation Error (422) + content: + application/json: + schema: + $ref: "#/components/responses/AirflowUnprocessableContentError" + "500": + description: + Server error while fetching workflow tasks. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['get_workflow_tasks_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + options: + responses: + "200": + $ref: "#/components/responses/200OptionsCors" + x-amazon-apigateway-integration: + responses: + default: + statusCode: "200" + responseParameters: + method.response.header.Access-Control-Allow-Methods: "'OPTIONS,GET'" + method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" + method.response.header.Access-Control-Allow-Origin: "'*'" + requestTemplates: + application/json: "{'statusCode':200}" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "mock" + /workflows/run/taskinstances: + get: + parameters: + - in: query + name: dagId + schema: + type: string + description: + The dag id to get the task instances for + required: true + - in: query + name: dagRunId + schema: + type: string + description: + The dag run id to get the task instances for + required: true + responses: + "200": +{# + TODO: need the response headers (i.e. cors) +#} + description: "Success" + content: + application/json: + schema: + type: object + description: + An object containing information on all + task instances in the DAG run + properties: + tasks_instances: + type: array + description: + An array of objects describing the + task instances of the DAG + items: + type: object + description: + An object describing one task instance. Format + described in the Airflow REST v2 API docs + for GET + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances + # this is not our format to define, so + # we'll just say it's any object format + additionalProperties: true + total_entries: + type: integer + description: the number of items in the task_instances array + "400": + description: Airflow Bad Params (400) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "401": + description: Airflow Unauthorized (401) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "403": + description: Airflow Forbidden (403) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "404": + description: Airflow Not Found (404) + content: + application/json: + schema: + oneOf: + - $ref: "#/components/responses/AirflowErrorDetailStrResp" + - $ref: "#/components/responses/AirflowErrorDetailObjResp" + "422": + description: Unprocessable Content/Validation Error (422) + content: + application/json: + schema: + $ref: "#/components/responses/AirflowUnprocessableContentError" + "500": + description: + Server error while fetching workflow tasks. + x-amazon-apigateway-integration: + httpMethod: "POST" + uri: "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/{{ handlers['get_workflow_run_task_instances_handler'] }}/invocations" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "aws_proxy" + options: + responses: + "200": + $ref: "#/components/responses/200OptionsCors" + x-amazon-apigateway-integration: + responses: + default: + statusCode: "200" + responseParameters: + method.response.header.Access-Control-Allow-Methods: "'OPTIONS,GET'" + method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" + method.response.header.Access-Control-Allow-Origin: "'*'" + requestTemplates: + application/json: "{'statusCode':200}" + passthroughBehavior: "when_no_match" + timeoutInMillis: 29000 + type: "mock" # reusable components that follow OpenApi 3.0.1 spec components: responses: + + AirflowErrorDetailStrResp: + description: + The error response format from the Airflow REST API returning a + detail string + type: object + properties: + detail: + type: string + AirflowErrorDetailObjResp: + description: + The error response format from the Airflow REST API returning a + detail object + type: object + properties: + detail: + type: object + # generally follows RFC 7807: + # https://datatracker.ietf.org/doc/html/rfc7807 + # but needs to be able to handle additions + additionalProperties: true + AirflowUnprocessableContentError: + description: The 422 response format from the Airflow REST API + type: object + properties: + detail: + type: array + items: + type: object + additionalProperties: true 200OptionsCors: description: "200 response" headers: diff --git a/assets/api/capi/handlers/get_dap_profile.py b/assets/api/capi/handlers/get_dap_profile.py index 9d1f2f9..ca8c17d 100644 --- a/assets/api/capi/handlers/get_dap_profile.py +++ b/assets/api/capi/handlers/get_dap_profile.py @@ -1,49 +1,14 @@ """Lambda function for handling a post of a new analysis pipeline run.""" import json -from decimal import Decimal from botocore.exceptions import ClientError from capepy.aws.dynamodb import PipelineTable -from capepy.aws.utils import decode_error - - -# TODO: need to add some abstraction of this to capepy. it's repeated here and -# in get_object_etls at least -def bad_param_response(): - """Gets a response data object and status code when bad params are given. - - :return: A tuple containins a response data object and an HTTP 400 status - code. - """ - return ( - { - "message": ( - "Missing required query string parameters: pipeline and version" - ) - }, - 400, - ) - - -# TODO: this should probably go elsewhere. issue is you can't json serialize -# Decimal values, and some of the values coming back from dynamo in the -# pipeline profile spec are Decimal. So this shims them to floats. -def json_serialize_the_unserializable(val): - """Serialize a value (e.g. Decimal) that is otherwise not json serializable. - - Right now this just handles Decimal, but can be updated as needed. - - :param val: The value to serialize. - :return: the serialized value. - :raises: TypeError if even this function cannot serialize. - """ - if isinstance(val, Decimal): - # this results in a reduction of precision which can cause issues. In - # our case (for now at least) it's ok, but we may want to consider other - # mechanisms like string conversions or forcing some rounding. - return float(val) - raise TypeError(f"Value {val} of type {type(val)} is not json serializable") +from capepy.aws.utils import ( + bad_param_response, + decode_error, + json_serialize_the_unserializable, +) def index_handler(event, context): @@ -54,29 +19,40 @@ def index_handler(event, context): :param context: Context object. """ + req_params = {"pipeline", "version"} + try: headers = event.get("headers", {}) qsp = event.get("queryStringParameters") if qsp is None: - resp_data, resp_status = bad_param_response() + resp_data, resp_status = bad_param_response(list(req_params)) else: pipeline_name = qsp.get("pipeline") version = qsp.get("version") if not pipeline_name or not version: - resp_data, resp_status = bad_param_response() + resp_data, resp_status = bad_param_response(list(req_params)) else: # get a reference to the registry table ddb_table = PipelineTable() - dap = ddb_table.get_pipeline(pipeline_name, version) + dap = ddb_table.get_pipelines_by_name(pipeline_name, version) resp_data = [] resp_status = 200 + if dap: - resp_data = dap["profile"] - print(f"resp_data: {resp_data}") + if len(dap) == 1: + resp_data = dap[0]["profile"] + else: # must be more than one, which is bad + resp_status = 409 + resp_data = { + "message": ( + f"More than one DAP returned for name " + f"'{pipeline_name}'@'{version}'." + ) + } # And return our response as a 200 return { "statusCode": resp_status, diff --git a/assets/api/capi/handlers/get_dap_status.py b/assets/api/capi/handlers/get_dap_status.py index 07c8b8a..460efca 100644 --- a/assets/api/capi/handlers/get_dap_status.py +++ b/assets/api/capi/handlers/get_dap_status.py @@ -5,7 +5,7 @@ import boto3 from botocore.exceptions import ClientError -from capepy.aws.utils import decode_error +from capepy.aws.utils import bad_param_response, decode_error logger = logging.getLogger(__name__) @@ -13,18 +13,6 @@ batch_client = boto3.client("batch") -def bad_param_response(): - """Gets a response data object and status code when bad params are given. - - :return: A tuple containins a response data object and an HTTP 400 status - code. - """ - return ( - {"message": ("Missing required query string parameters: jobIds")}, - 400, - ) - - def index_handler(event, context): """Handler for the GET of status of analysis pipeline jobs. @@ -33,16 +21,18 @@ def index_handler(event, context): :param context: Context object. """ + req_params = {"jobIds"} + try: qsp = event.get("queryStringParameters") resp_status = 200 if qsp is None: - resp_data, resp_status = bad_param_response() + resp_data, resp_status = bad_param_response(list(req_params)) else: job_ids = qsp.get("jobIds") if job_ids is None: - resp_data, resp_status = bad_param_response() + resp_data, resp_status = bad_param_response(list(req_params)) else: response = batch_client.describe_jobs( jobs=[id.strip() for id in job_ids.split(",") if id] diff --git a/assets/api/capi/handlers/get_workflow_dags.py b/assets/api/capi/handlers/get_workflow_dags.py new file mode 100644 index 0000000..b2da6ac --- /dev/null +++ b/assets/api/capi/handlers/get_workflow_dags.py @@ -0,0 +1,90 @@ +"""Lambda function for handling a get of airflow workflow details.""" + +import json +import os + +import boto3 +from botocore.exceptions import ClientError +from capepy.aws.utils import decode_error + + +def index_handler(event, context): + """Handler for the GET of one or all airflow workflows. + + This endpoint is a proxy to the airflow /api/v2/dags endpoint. Done as a + lambda instead of direct integration so we can massage data as required. + + This endpoint does not return any CAPE specific data such as the pipeline + profiles of the pipelines in the workflows. That is a separate API call. + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + env_name = os.getenv("MWAA_ENVIRONMENT") + + # TODO: add this to capepy + mwaa_client = boto3.client("mwaa") + + try: + qsp = event.get("queryStringParameters") + + api_path = "/dags" + include_disabled = False + + dag_id = None + if qsp is not None: + dag_id = qsp.get("dagId") + + if dag_id is not None: + api_path = f"{api_path}/{dag_id}" + + # TODO: ensure this comes back as python boolean + include_disabled = qsp.get("includeDisabled", False) + + request_params = { + "Name": env_name, + "Path": api_path, + "Method": "GET", + "QueryParameters": {"paused": include_disabled}, + } + + response = mwaa_client.invoke_rest_api(**request_params) + + # no matter the status code of the response we can return the same + # thing. the difference in 200 vs non-200 is that the json will contain + # an error string under the key "detail" instead of workflow data in + # the non-200 case + return { + "statusCode": response["RestApiStatusCode"], + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps(response["RestApiResponse"]), + } + except ClientError as err: + code, message = decode_error(err) + + msg = ( + f"Error during fetch of workflow data from airflow. " + f"{code} {message}" + ) + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/api/capi/handlers/get_workflow_pipeline_profiles.py b/assets/api/capi/handlers/get_workflow_pipeline_profiles.py new file mode 100644 index 0000000..713c2f6 --- /dev/null +++ b/assets/api/capi/handlers/get_workflow_pipeline_profiles.py @@ -0,0 +1,102 @@ +"""Lambda function for handling a get of pipeline profiles used in a workflow.""" + +import json + +from botocore.exceptions import ClientError +from capepy.aws.dynamodb import PipelineTable, WorkflowMetaTable +from capepy.aws.utils import ( + bad_param_response, + decode_error, + json_serialize_the_unserializable, +) + + +def index_handler(event, context): + """Handler for the GET of the profiles used in a workflow (airflow dag). + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + req_params = {"dagId"} + + try: + headers = event.get("headers", {}) + + qsp = event.get("queryStringParameters") + resp_status = 200 + resp_data = {} + + if qsp is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + dag_id = qsp.get("dagId") + + if dag_id is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + workflow_table = WorkflowMetaTable() + wf = workflow_table.get_workflow_by_id(dag_id) + + # get a reference to the registry table + dapreg_table = PipelineTable() + resp_data = [] + resp_status = 200 + + if wf is None: + resp_data = [ + {"detail": f"Could not find workflow with id {dag_id} "} + ] + resp_status = 404 + else: + for pid in wf["pipeline_ids"]: + + dap = dapreg_table.get_pipeline(pid) + if dap: + resp_data.append(dap["profile"]) + else: + # TODO: What other errors to handle here? + resp_data = [ + { + "detail": f"Could not find pipeline profile for pipeline with id {pid} " + } + ] + resp_status = 404 + break + + # And return our response however it worked out + return { + "statusCode": resp_status, + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps( + resp_data, default=json_serialize_the_unserializable + ), + } + except ClientError as err: + code, message = decode_error(err) + + msg = ( + f"Error during fetch of workflow pipeline profiles. " + f"{code} {message}" + ) + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/api/capi/handlers/get_workflow_run.py b/assets/api/capi/handlers/get_workflow_run.py new file mode 100644 index 0000000..1f90140 --- /dev/null +++ b/assets/api/capi/handlers/get_workflow_run.py @@ -0,0 +1,93 @@ +"""Lambda function for handling a get of an airflow workflow run.""" + +import json +import os + +import boto3 +from botocore.exceptions import ClientError +from capepy.aws.utils import bad_param_response, decode_error + + +def index_handler(event, context): + """Handler for the GET of one or all airflow workflow runs of a DAG. + + This endpoint is a proxy to the airflow + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} endpoint. Done as a + lambda instead of direct integration so we can massage data as required. + + This endpoint does not return any CAPE specific data such as the pipeline + profiles of the pipelines in the workflows. That is a separate API call. + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + env_name = os.getenv("MWAA_ENVIRONMENT") + + # TODO: add this to capepy + mwaa_client = boto3.client("mwaa") + + req_params = {"dagId", "dagRunId"} + + try: + qsp = event.get("queryStringParameters") + + if qsp is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + dag_id = qsp.get("dagId") + dag_run_id = qsp.get("dagRunId") + + if None in [dag_id, dag_run_id]: + resp_data, resp_status = bad_param_response(list(req_params)) + + else: + api_path = f"/dags/{dag_id}/dagRuns/{dag_run_id}" + + request_params = { + "Name": env_name, + "Path": api_path, + "Method": "GET", + } + + response = mwaa_client.invoke_rest_api(**request_params) + resp_data = response["RestApiResponse"] + resp_status = response["RestApiStatusCode"] + + # no matter the status code of the response we can return the same + # thing. the difference in 200 vs non-200 is that the json will contain + # an error string under the key "detail" instead of workflow data in + # the non-200 case + return { + "statusCode": resp_status, + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps(resp_data), + } + except ClientError as err: + code, message = decode_error(err) + + msg = ( + f"Error during fetch of workflow run data from airflow. " + f"{code} {message}" + ) + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/api/capi/handlers/get_workflow_run_task_instances.py b/assets/api/capi/handlers/get_workflow_run_task_instances.py new file mode 100644 index 0000000..f2f9faf --- /dev/null +++ b/assets/api/capi/handlers/get_workflow_run_task_instances.py @@ -0,0 +1,93 @@ +"""Lambda function for handling a get of an airflow workflow run.""" + +import json +import os + +import boto3 +from botocore.exceptions import ClientError +from capepy.aws.utils import bad_param_response, decode_error + + +def index_handler(event, context): + """Handler for the GET of all task instances for an airflow workflow runs. + + This endpoint is a proxy to the airflow + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances endpoint. Done as + a lambda instead of direct integration so we can massage data as required. + + This endpoint does not return any CAPE specific data such as the pipeline + profiles of the pipelines in the workflows. That is a separate API call. + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + env_name = os.getenv("MWAA_ENVIRONMENT") + + # TODO: add this to capepy + mwaa_client = boto3.client("mwaa") + + req_params = {"dagId", "dagRunId"} + + try: + qsp = event.get("queryStringParameters") + + if qsp is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + dag_id = qsp.get("dagId") + dag_run_id = qsp.get("dagRunId") + + if None in [dag_id, dag_run_id]: + resp_data, resp_status = bad_param_response(list(req_params)) + + else: + api_path = f"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" + + request_params = { + "Name": env_name, + "Path": api_path, + "Method": "GET", + } + + response = mwaa_client.invoke_rest_api(**request_params) + resp_data = response["RestApiResponse"] + resp_status = response["RestApiStatusCode"] + + # no matter the status code of the response we can return the same + # thing. the difference in 200 vs non-200 is that the json will contain + # an error string under the key "detail" instead of workflow data in + # the non-200 case + return { + "statusCode": resp_status, + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps(resp_data), + } + except ClientError as err: + code, message = decode_error(err) + + msg = ( + f"Error during fetch of workflow run data from airflow. " + f"{code} {message}" + ) + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/api/capi/handlers/get_workflow_tasks.py b/assets/api/capi/handlers/get_workflow_tasks.py new file mode 100644 index 0000000..a8fe703 --- /dev/null +++ b/assets/api/capi/handlers/get_workflow_tasks.py @@ -0,0 +1,92 @@ +"""Lambda function for handling a get of an airflow workflow run.""" + +import json +import os + +import boto3 +from botocore.exceptions import ClientError +from capepy.aws.utils import bad_param_response, decode_error + + +def index_handler(event, context): + """Handler for the GET all tasks for an airflow workflow DAG. + + This endpoint is a proxy to the airflow /api/v2/dags/{dag_id}/tasks + endpoints. Done as a lambda instead of direct integration so we can + massage data as required. + + This endpoint does not return any CAPE specific data such as the pipeline + profiles of the pipelines in the workflows. That is a separate API call. + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + env_name = os.getenv("MWAA_ENVIRONMENT") + + # TODO: add this to capepy + mwaa_client = boto3.client("mwaa") + + req_params = {"dagId"} + + try: + qsp = event.get("queryStringParameters") + + if qsp is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + dag_id = qsp.get("dagId") + + if dag_id is None: + resp_data, resp_status = bad_param_response(list(req_params)) + + else: + api_path = f"/dags/{dag_id}/tasks" + + request_params = { + "Name": env_name, + "Path": api_path, + "Method": "GET", + } + + response = mwaa_client.invoke_rest_api(**request_params) + resp_data = response["RestApiResponse"] + resp_status = response["RestApiStatusCode"] + + # no matter the status code of the response we can return the same + # thing. the difference in 200 vs non-200 is that the json will contain + # an error string under the key "detail" instead of workflow data in + # the non-200 case + return { + "statusCode": resp_status, + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps(resp_data), + } + except ClientError as err: + code, message = decode_error(err) + + msg = ( + f"Error during fetch of workflow run data from airflow. " + f"{code} {message}" + ) + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/api/capi/handlers/patch_workflow_run.py b/assets/api/capi/handlers/patch_workflow_run.py new file mode 100644 index 0000000..132679c --- /dev/null +++ b/assets/api/capi/handlers/patch_workflow_run.py @@ -0,0 +1,97 @@ +"""Lambda function for handling a patch to halt an airflow dag run.""" + +import datetime +import json +import os + +import boto3 +from botocore.exceptions import ClientError +from capepy.aws.utils import bad_param_response, decode_error + + +def index_handler(event, context): + """Handler for the PATCH to halt an airflow dag run. + + This endpoint is a proxy to the airflow + /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} endpoint. Done as a lambda + instead of direct integration so we can massage data as required. + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + env_name = os.getenv("MWAA_ENVIRONMENT") + + # TODO: add this to capepy + mwaa_client = boto3.client("mwaa") + + try: + req_params = {"dagId", "dagRunId"} + + qsp = event.get("queryStringParameters") + + if qsp is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + dag_id = qsp.get("dagId") + dag_run_id = qsp.get("dagRunId") + body = json.loads(event["body"]) + + if not all([dag_id, dag_run_id]): + resp_data, resp_status = bad_param_response(req_params) + else: + update_mask = ["state"] + note = body.get("note") + req_body = {"state": "failed"} + + if note: + update_mask.append("note") + req_body.update({"note": note}) + + request_params = { + "Name": env_name, + "Path": f"/dags/{dag_id}/dagRuns/{dag_run_id}", + "Method": "PATCH", + "QueryParameters": {"update_mask": update_mask}, + "Body": req_body, + } + + response = mwaa_client.invoke_rest_api(**request_params) + + # no matter the status code of the response we can return the same + # thing. the difference in 200 vs non-200 is that the json will contain + # an error string under the key "detail" instead of workflow data in + # the non-200 case + return { + "statusCode": response["RestApiStatusCode"], + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps(response["RestApiResponse"]), + } + except ClientError as err: + code, message = decode_error(err) + + msg = ( + f"Error during halting/failing of airflow dag run. " + f"{code} {message}" + ) + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/api/capi/handlers/post_workflow_run.py b/assets/api/capi/handlers/post_workflow_run.py new file mode 100644 index 0000000..a4f8a30 --- /dev/null +++ b/assets/api/capi/handlers/post_workflow_run.py @@ -0,0 +1,111 @@ +"""Lambda function for handling a post to trigger an airflow dag.""" + +import datetime +import json +import os +import re + +import boto3 +from botocore.exceptions import ClientError +from capepy.aws.utils import bad_param_response, decode_error + + +def index_handler(event, context): + """Handler for the POST to trigger an airflow dag. + + This endpoint is a proxy to the airflow /api/v2/dags/{dag_id}/dagRuns + endpoint. Done as a lambda instead of direct integration so we can massage + data as required. + + :param event: The event object that contains the HTTP request and json + data. + :param context: Context object. + """ + + env_name = os.getenv("MWAA_ENVIRONMENT") + + # TODO: add this to capepy + mwaa_client = boto3.client("mwaa") + + try: + req_params = {"dagId"} + + qsp = event.get("queryStringParameters") + + if qsp is None: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + + dag_id = qsp.get("dagId") + dag_params = json.loads(event["body"]) + + if not dag_id: + resp_data, resp_status = bad_param_response(list(req_params)) + else: + + # TODO: we can add some additional run params that airflow + # supports: + # - specific run id (probably don't want people using this + # usually if ever) + # - note (freetext string) + # - run_after (if we want to get into scheduling from + # users) + # - there are others like data_interval_[start|end] that + # are used internally in dags that process data of + # specific intervals + + # the logical date must be specified (but may be null) when + # triggering. we're specifying the value. *but* it wants ISO + # 8601 format ending in `Z` which isn't supported in python + # natively till v3.11. so this makes the time string then + # replaces the bad part with `Z` so the airflow api accepts it. + now_str = datetime.datetime.now().isoformat() + zstr = re.sub(r"\..*$", "Z", now_str) + + request_params = { + "Name": env_name, + "Path": f"/dags/{dag_id}/dagRuns", + "Method": "POST", + "Body": { + "conf": dag_params, + "logical_date": zstr, # datetime.datetime.now().isoformat(), + }, + } + + response = mwaa_client.invoke_rest_api(**request_params) + + resp_data = response["RestApiResponse"] + resp_status = response["RestApiStatusCode"] + # no matter the status code of the response we can return the same + # thing. the difference in 200 vs non-200 is that the json will contain + # an error string under the key "detail" instead of workflow data in + # the non-200 case + return { + "statusCode": resp_status, + "headers": { + "Content-Type": "application/json", + # TODO: ISSUE #141 CORS bypass. We do not want this long term. + # When we get all the api and web resources on the same + # domain, this may not matter too much. But we may + # eventually end up with needing to handle requests from + # one domain served up by another domain in a lambda + # handler. In that case we'd need to be able to handle + # CORS, and would want to look into allowing + # configuration of the lambda (via pulumi config that + # turns into env vars for the lambda) that set the + # origins allowed for CORS. + "Access-Control-Allow-Headers": "Content-Type", + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "OPTIONS,GET", + }, + "body": json.dumps(resp_data), + } + except ClientError as err: + code, message = decode_error(err) + + msg = f"Error during trigger of airflow dag run. {code} {message}" + + return { + "statusCode": 500, + "body": msg, + } diff --git a/assets/lambda-layers/capepy/capepy-2.2.0-py3-none-any.whl b/assets/lambda-layers/capepy/capepy-3.0.0-py3-none-any.whl similarity index 50% rename from assets/lambda-layers/capepy/capepy-2.2.0-py3-none-any.whl rename to assets/lambda-layers/capepy/capepy-3.0.0-py3-none-any.whl index eb29689..fee97c4 100644 Binary files a/assets/lambda-layers/capepy/capepy-2.2.0-py3-none-any.whl and b/assets/lambda-layers/capepy/capepy-3.0.0-py3-none-any.whl differ diff --git a/assets/lambda-layers/capepy/capepy_layer.zip b/assets/lambda-layers/capepy/capepy_layer.zip index b23b6ba..0023c17 100644 Binary files a/assets/lambda-layers/capepy/capepy_layer.zip and b/assets/lambda-layers/capepy/capepy_layer.zip differ diff --git a/assets/lambda-layers/capi-all/requirements.txt b/assets/lambda-layers/capi-all/requirements.txt index ed72c1f..4f473ad 100644 --- a/assets/lambda-layers/capi-all/requirements.txt +++ b/assets/lambda-layers/capi-all/requirements.txt @@ -1 +1 @@ -capepy==2.2.0 +capepy==3.0.0 diff --git a/capeinfra/iam.py b/capeinfra/iam.py index cbedf65..ae872e0 100644 --- a/capeinfra/iam.py +++ b/capeinfra/iam.py @@ -313,10 +313,18 @@ def get_api_statements( "actions": [ "dynamodb:DescribeTable", "dynamodb:GetItem", + "dynamodb:Query", "dynamodb:Scan", ], "resources": [ f"arn:aws:dynamodb:*:*:table/{t}", + # TODO: we now have an index that needs to be queried, which + # doesn't get only the table name but also the index + # name here. just opening to all indices right now as + # we're moving away from the `get_api_statements` way + # of doing things toward resource provided policies + # and need to handle this in that switch. + f"arn:aws:dynamodb:*:*:table/{t}/index/*", ], }, ) diff --git a/capeinfra/meta/capemeta.py b/capeinfra/meta/capemeta.py index 23088a6..c993b56 100644 --- a/capeinfra/meta/capemeta.py +++ b/capeinfra/meta/capemeta.py @@ -12,7 +12,6 @@ from boto3.dynamodb.types import TypeSerializer from pulumi import ( AssetArchive, - Config, FileArchive, FileAsset, Output, @@ -179,7 +178,7 @@ def __init__(self, assets_bucket: VersionedBucket, **kwargs): ) self.bucket = assets_bucket - capepy_whl = "capepy-2.2.0-py3-none-any.whl" + capepy_whl = "capepy-3.0.0-py3-none-any.whl" self.object = self.bucket.add_object( f"{self.name}-object", key=capepy_whl, @@ -259,7 +258,11 @@ def __init__(self, **kwargs): name="cape-users", account_recovery_setting={ "recovery_mechanisms": [ - {"name": "verified_email", "priority": 1} + # HACK: pyright typing seems broken for identifying + # dictionary, this is a hacky fix for that + aws.cognito.UserPoolAccountRecoverySettingRecoveryMechanismArgs( + name="verified_email", priority=1 + ) ] }, admin_create_user_config={"allow_admin_create_user_only": True}, diff --git a/capeinfra/pipeline/airflow.py b/capeinfra/pipeline/airflow.py index 313834f..c2b6b08 100644 --- a/capeinfra/pipeline/airflow.py +++ b/capeinfra/pipeline/airflow.py @@ -1,6 +1,7 @@ """Abstractions for Apache Airflow.""" import json +from enum import Enum import pulumi_aws as aws from pulumi import Input, Output, ResourceOptions @@ -23,6 +24,11 @@ class MwaaEnvironment(CapeComponentResource): to change or have a new set of resources defined. """ + class PolicyEnum(str, Enum): + """Enum of supported policy names for this component.""" + + invoke_api = "invoke_api" + @property def default_config(self): return {"dag_path": "airflow/dags", "extra_env_args": {}} @@ -49,7 +55,7 @@ def base_role_policy_statements( "effect": "Allow", "actions": "airflow:PublishMetrics", "resources": [ - f"arn:aws:airflow:{self.aws_region}:767397883306:environment/{self.mwaa_env_name}" + f"arn:aws:airflow:{self.aws_region}:{self.aws_account_id}:environment/{self.mwaa_env_name}" ], }, { @@ -64,7 +70,7 @@ def base_role_policy_statements( "logs:GetQueryResults", ], "resources": [ - f"arn:aws:logs:{self.aws_region}:767397883306:log-group:airflow-{self.mwaa_env_name}-*" + f"arn:aws:logs:{self.aws_region}:{self.aws_account_id}:log-group:airflow-{self.mwaa_env_name}-*" ], }, { @@ -107,7 +113,7 @@ def base_role_policy_statements( ], # our mwaa env uses an aws-managed set of keys, so we # have to explicitly deny our own account's keyspace - "not_resources": "arn:aws:kms:*:767397883306:key/*", + "not_resources": f"arn:aws:kms:*:{self.aws_account_id}:key/*", "conditions": [ { "test": "StringLike", @@ -126,6 +132,7 @@ def __init__( subnets: dict[str, aws.ec2.Subnet], ingress_subnets: dict[str, aws.ec2.Subnet], aws_region: str, + aws_account_id: str, extra_policy_statements: ( list[aws.iam.GetPolicyDocumentStatementArgsDict] | None ) = None, @@ -150,6 +157,7 @@ def __init__( self.name = f"{name}" self.mwaa_env_name = f"{self.name}-env" self.aws_region = aws_region + self.aws_account_id = aws_account_id # get in a var so we aren't typing super long lines anywhere ma_bucket = capeinfra.meta.automation_assets_bucket @@ -275,6 +283,24 @@ def __init__( } ) + @property + def policies(self) -> dict[ + str, + list[aws.iam.GetPolicyDocumentStatementArgsDict], + ]: + if self._policies is None: + self._policies = dict[ + str, + list[aws.iam.GetPolicyDocumentStatementArgsDict], + ]() + self._policies[self.PolicyEnum.invoke_api] = [ + { + "effect": "Allow", + "actions": ["airflow:InvokeRestApi"], + } + ] + return self._policies + # TODO: feels this should be able to pass exactly one role, or be able to # pass exactly one role to exactly one named batch compute env. # TODO: ISSUE #338 diff --git a/capeinfra/pipeline/dapregistry.py b/capeinfra/pipeline/registry.py similarity index 85% rename from capeinfra/pipeline/dapregistry.py rename to capeinfra/pipeline/registry.py index 1aef71b..21925b3 100644 --- a/capeinfra/pipeline/dapregistry.py +++ b/capeinfra/pipeline/registry.py @@ -107,11 +107,14 @@ def create_dap_registry_table(self): # probably be much cheaper to go that route if we have a # really solid idea of how many reads/writes this table needs billing_mode="PAY_PER_REQUEST", - hash_key="pipeline_name", - range_key="version", + hash_key="pipeline_id", attributes=[ # NOTE: we do not need to define any part of the "schema" here # that isn't needed in an index. + { + "name": "pipeline_id", + "type": "S", + }, { "name": "pipeline_name", "type": "S", @@ -121,7 +124,39 @@ def create_dap_registry_table(self): "type": "S", }, ], - opts=ResourceOptions(parent=self), + global_secondary_indexes=[ + aws.dynamodb.TableGlobalSecondaryIndexArgs( + name="PipelineNameVerIndex", + key_schemas=[ + aws.dynamodb.TableGlobalSecondaryIndexKeySchemaArgs( + attribute_name="pipeline_name", + key_type="HASH", + ), + aws.dynamodb.TableGlobalSecondaryIndexKeySchemaArgs( + attribute_name="version", + key_type="RANGE", + ), + ], + projection_type="ALL", + non_key_attributes=[], + read_capacity=0, + write_capacity=0, + ) + ], + opts=ResourceOptions( + parent=self, + # TODO: this is a bazooka against an ant. the GSI keeps showing + # changes in the for of adding `__defaults: []` in the GSI + # and each entry in the key_schemas as well as removal of + # the previously specified hash_key and range_key values + # (which were replaced by key_schemas recently). this + # despite the table being removed and rebuilt totally + # since removal of the `[hash|range]_key` fields (meaning + # they should no longer be involved at all). so we're + # ignoring all GSI changes reported from the server. which + # is bad if we make a real change...need to fix + ignore_changes=["global_secondary_indexes"], + ), tags={ "desc_name": ( f"{self.desc_name} Analysis Pipeline Registry DynamoDB Table" @@ -187,12 +222,10 @@ def load_pipeline_assets(self): f"{self.name}-{stem}-ddbitem", table_name=self.analysis_pipeline_registry_ddb_table.name, hash_key=self.analysis_pipeline_registry_ddb_table.hash_key, - range_key=self.analysis_pipeline_registry_ddb_table.range_key.apply( - lambda rk: f"{rk}" - ), item=Output.json_dumps( { "pipeline_name": {"S": profile["pipelineName"]}, + "pipeline_id": {"S": profile["pipelineId"]}, "version": {"S": profile["version"]}, "project": {"S": profile["project"]}, "pipeline_type": {"S": profile["pipelineType"]}, diff --git a/capeinfra/resources/api.py b/capeinfra/resources/api.py index 07d51d5..65b49d5 100644 --- a/capeinfra/resources/api.py +++ b/capeinfra/resources/api.py @@ -10,7 +10,7 @@ from collections.abc import Mapping import pulumi_aws as aws -from pulumi import AssetArchive, FileAsset, Output, ResourceOptions, log +from pulumi import AssetArchive, FileAsset, Output, ResourceOptions import capeinfra from capeinfra.iam import ( @@ -42,10 +42,16 @@ def __init__( spec_path: str, stage_suffix: str, env_vars: Mapping[str, Output[str] | str], - resource_grants: dict[str, list[Output]], + legacy_resource_grants: dict[str, list[Output]], + policy_statements: list[aws.iam.GetPolicyDocumentStatementArgsDict], vpc_endpoint: aws.ec2.VpcEndpoint, domain_name: Output, *args, + # TODO: the vpc config was added hastily here as we need the + # lambdas to be deployed in the vpc to have access to MWAA. + # may not need changing, but the design should be thought more + # about here. + lambda_vpc_cfg: aws.lambda_.FunctionVpcConfigArgs | None = None, **kwargs, ): """Constructor. @@ -73,6 +79,8 @@ def __init__( requests will pass. domain_name: The domain name (e.g. api.cape-dev.org) on which this API will reside. + lambda_vpc_cfg: If specified, the vpc config for lambda endpoint + functions. authorizer_path: Optional path to the source file for a lambda authorizer for the API. If not provided, no authorizer will be configured for the API. @@ -86,6 +94,7 @@ def __init__( self.spec_path = spec_path self.api_vpcendpoint = vpc_endpoint self.domain_name = domain_name + self.lambda_vpc_cfg = lambda_vpc_cfg # this will map the ids (string ids) from the config to a tuple of # (function name, Lambda Function Resource) so we can fill in the @@ -95,7 +104,7 @@ def __init__( self._ids_to_lambdas = {} self._configure_logging() - self._create_api_ep_lambdas(resource_grants) + self._create_api_ep_lambdas(legacy_resource_grants, policy_statements) self._create_api_authorizer_lambdas() self._create_aws_proxy_roles() self._render_spec() @@ -152,7 +161,8 @@ def _configure_logging(self): def _create_api_ep_lambdas( self, - res_grants: dict[str, list[Output]], + legacy_res_grants: dict[str, list[Output]], + policy_statements: list[aws.iam.GetPolicyDocumentStatementArgsDict], ): """Create the Lambda functions acting as endpoint handlers for the API. @@ -170,16 +180,30 @@ def _create_api_ep_lambdas( # read from DynamoDB in another, this role's policy must have both # those grants). This may not be the long term implementation. # TODO: ISSUE 245 + + # TODO: migrate policies into each policy granting resource + all_policy_statements = aggregate_statements( + [ + Output.all( + grants=legacy_res_grants, + ).apply(lambda kwargs: get_api_statements(**kwargs)) + ] + + policy_statements + ) + self._api_lambda_role = get_inline_role( f"{self.name}-lmbd-role", f"{self.desc_name} {self.config.get('desc')} lambda role", "lmbd", "lambda.amazonaws.com", - # TODO: migrate policies into each policy granting resource - Output.all( - grants=res_grants, - ).apply(lambda kwargs: get_api_statements(**kwargs)), - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + all_policy_statements, + [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + # TODO: This is needed to make use of the endpoints requiring vpc + # access to things like MWAA. we probably want to limit it + # to those lambdas longer term + "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole", + ], ) # make functions from the configuration and save the mapping of the @@ -228,6 +252,7 @@ def _create_api_ep_lambdas( ), memory_size=funct_args.get("memory_size", 128), timeout=funct_args.get("timeout", 3), + vpc_config=self.lambda_vpc_cfg, ) # update our mapping of function ids from the config to the name and diff --git a/capeinfra/resources/database.py b/capeinfra/resources/database.py index 3d8900c..37e7cd5 100644 --- a/capeinfra/resources/database.py +++ b/capeinfra/resources/database.py @@ -83,6 +83,7 @@ def policies(self) -> dict[ "actions": [ "dynamodb:DescribeTable", "dynamodb:GetItem", + "dynamodb:Query", # TODO: arguably we don't need to include scan here. # scan reads the whole table (or a bunch of the # table) at once. we grant it currently for the diff --git a/capeinfra/swimlane.py b/capeinfra/swimlane.py index 68c79fc..dab0bf3 100644 --- a/capeinfra/swimlane.py +++ b/capeinfra/swimlane.py @@ -10,7 +10,6 @@ # TODO: ISSUE #145 this import is only needed for the temporary DAP S3 handling. # it should not be here after 145. from capeinfra.datalake.datalake import CatalogDatabase -from capeinfra.pipeline.airflow import MwaaEnvironment from capeinfra.pipeline.batch import BatchCompute, BatchJobDefinition from capeinfra.pipeline.ecr import ContainerRepository from capeinfra.resources.certs import BYOCert @@ -80,13 +79,6 @@ def __init__( # to. e.g. self.az_assets["us-east-2b"]["inet_nat_gw"] is the internet # facing nat gateway for az "us=east-2b" self.az_assets = dict[str, dict[str, Any]]() - # TODO: there's maybe a common base class that could be made for batch - # and mwaa (and later additions too) environments. I don't really - # see how the envs are used after being created, so i don't yet - # know if keeping all compute envs together is a good idea or not. - # For now we'll just keep them separate until we show airflow - # working. - self.mwaa_compute_environments = dict[str, MwaaEnvironment]() self.batch_compute_environments = dict[str, BatchCompute]() self.job_definitions = dict[str, BatchJobDefinition]() self.albs = {} diff --git a/capeinfra/swimlanes/private.py b/capeinfra/swimlanes/private.py index 00fb03d..6374c52 100644 --- a/capeinfra/swimlanes/private.py +++ b/capeinfra/swimlanes/private.py @@ -8,6 +8,7 @@ import pulumi_aws as aws from pulumi import Config, Output, ResourceOptions, warn +from pulumi_aws.iam import GetPolicyDocumentStatementArgsDict from pulumi_synced_folder import S3BucketFolder import capeinfra @@ -22,7 +23,7 @@ get_vpce_api_invoke_policy, ) from capeinfra.pipeline.airflow import MwaaEnvironment -from capeinfra.pipeline.dapregistry import DAPRegistry, WorkflowMetaRegistry +from capeinfra.pipeline.registry import DAPRegistry, WorkflowMetaRegistry # TODO: ISSUE #145 This import is to support the temporary dap results s3 # handling. @@ -100,6 +101,9 @@ def __init__(self, name, *args, **kwargs): aws_config = Config("aws") self.aws_region = aws_config.require("region") + self.aws_account_id = aws.get_caller_identity().account_id + + self.mwaa_compute_environment = None # will contain a mapping of env var labels to resource names and types. # these may be used in api configuration to state the need for a @@ -212,6 +216,9 @@ def _deploy_api(self, api_name): resource_grants = {} for ev in self.apis[api_name]["spec"].get("env_vars", []): env_vars.setdefault(ev, self._exposed_env_vars[ev]["resource_name"]) + + # TODO: this is the old style policy management we should be moving + # away from res = resource_grants.setdefault( self._exposed_env_vars[ev]["type"], [] ) @@ -221,6 +228,42 @@ def _deploy_api(self, api_name): if self._exposed_env_vars[ev]["resource_name"] not in res: res.append(self._exposed_env_vars[ev]["resource_name"]) + # TODO: this is the new style policy statements we should be moving to + policy_statements = [] + + if self.mwaa_compute_environment is not None: + mep = self.mwaa_compute_environment.policies[ + MwaaEnvironment.PolicyEnum.invoke_api + ] + policy_statements.append( + Output.all( + arn=self.mwaa_compute_environment.mwaa_environment.arn, + name=self.mwaa_compute_environment.mwaa_environment.name, + ).apply( + # adding invoke rest api perms for the `Op` default role in + # airflow. Need Op as we will be configuring runs in + # addition to triggering (if no config, we'd be able to use + # User role) + lambda kwargs: add_resources( + mep, + # TODO: this isn't the arn of the environment or of the + # execution role. rather it seems to be the arn of + # the Op role in for airflow. anyway, if not specified + # like this (previously was using the env arn with + # `/Op` at the end) it fails. Need a good way to + # construct this + f"arn:aws:airflow:{self.aws_region}:{self.aws_account_id}:role/{kwargs['name']}/Op", + f"{kwargs['arn']}", + ) + ) + ) + + sgis = [] + if self.mwaa_compute_environment is not None: + # TODO: hijacking the sec group here, need to either make this + # one a default for a lot of things or create a new one + sgis = [self.mwaa_compute_environment.security_group.id] + self.apis[api_name]["deploy"] = CapeRestApi( f"{self.basename}-{api_name}-api", api_name, @@ -228,11 +271,18 @@ def _deploy_api(self, api_name): self.api_stage_suffix, env_vars, resource_grants, + policy_statements, self.api_vpcendpoint, self.apigw_domainname.domain_name, config=self.apis[api_name]["spec"], desc_name=f"{self.apis[api_name]['spec']['desc']}", opts=ResourceOptions(parent=self), + lambda_vpc_cfg=aws.lambda_.FunctionVpcConfigArgs( + security_group_ids=sgis, + subnet_ids=[ + sn.id for sn in self.get_subnets_by_type("compute").values() + ], + ), ) def create_airflow_compute_environment(self): @@ -248,26 +298,29 @@ def create_airflow_compute_environment(self): # environments. but we support more if needed. for now we assume they # will have the same set of policies. - for env in self.config.get( - "compute", "environments", "mwaa", default=[] - ): - name = env.get("name") + mwaa_cfg = self.config.get( + "compute", "environments", "mwaa", default=None + ) + + if mwaa_cfg is not None: + name = mwaa_cfg.get("name") env_subnets = dict[str, aws.ec2.Subnet]() - for st in env.get("subnet_types"): + for st in mwaa_cfg.get("subnet_types"): env_subnets.update(self.get_subnets_by_type(st)) ingress_subnets = dict[str, aws.ec2.Subnet]() - for st in env.get("ingress_subnet_types"): + for st in mwaa_cfg.get("ingress_subnet_types"): ingress_subnets.update(self.get_subnets_by_type(st)) - self.mwaa_compute_environments[name] = MwaaEnvironment( + self.mwaa_compute_environment = MwaaEnvironment( f"{self.basename}-{name}-mwaa", vpc=self.vpc, subnets=env_subnets, ingress_subnets=ingress_subnets, - config=env, - aws_region=capeinfra.data_lakehouse.aws_region, + config=mwaa_cfg, + aws_region=self.aws_region, + aws_account_id=self.aws_account_id, # TODO: add policy attachments extra_policy_statements=None, ) @@ -275,16 +328,51 @@ def create_airflow_compute_environment(self): # now configure the new mwaa environment to be able to pass the # batch roles to batch # TODO: ISSUE #338 - self.mwaa_compute_environments[ - name - ].configure_batch_compute_pass_role( + self.mwaa_compute_environment.configure_batch_compute_pass_role( [bce for bce in list(self.batch_compute_environments.values())] ) - self.mwaa_compute_environments[name].configure_batch_job_def_policy( + self.mwaa_compute_environment.configure_batch_job_def_policy( [bjd for bjd in list(self.job_definitions.values())] ) + self._exposed_env_vars.setdefault( + "MWAA_ENVIRONMENT", + { + "resource_name": self.mwaa_compute_environment.mwaa_environment.name, + # TODO: this is just to keep the same interface we were + # using for exposed env vars and res grants before + # moving to resource provided policies. We should + # ideally never care about type anymore once we have + # moved everything to that pattern and should get rid + # of this + "type": "untyped", + }, + ) + + # need this VPCE in order to be able to hit the airflow rest api + self.mwaa_env_vpcendpoint = aws.ec2.VpcEndpoint( + f"{self.basename}-{name}-mwaa-vpce", + vpc_id=self.vpc.id, + service_name=f"com.amazonaws.{aws.get_region().region}.airflow.env", + vpc_endpoint_type="Interface", + private_dns_enabled=True, + subnet_ids=[ + s.id + for _, s in self.get_subnets_by_type( + SubnetType.COMPUTE + ).items() + ], + # TODO: hijacking the sec group here, need to either make this + # one a default for a lot of things or create a new one + security_group_ids=[ + self.mwaa_compute_environment.security_group.id + ], + tags={ + "desc_name": f"{self.desc_name} MWAA Environment endpoint", + }, + ) + def create_env_rds_instance(self): """Creates the CAPE environment RDS instance.""" @@ -357,6 +445,19 @@ def create_workflow_meta_registry(self): ), ) + # read access to this this resource can be configured via the deployment + # config (for api lambdas), so add it to the bookkeeping structure for + # that + self._exposed_env_vars.setdefault( + "WORKFLOW_REG_DDB_TABLE", + { + "resource_name": ( + self.workflow_meta_registry.workflow_meta_ddb_table.ddb_table.name + ), + "type": "table", + }, + ) + # TODO: ISSUE #126 # TODO: refactor out elsewhere def _deploy_static_app(self, sa_cfg: CapeConfig): diff --git a/requirements.txt b/requirements.txt index e172a62..7e64e78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -./assets/lambda-layers/capepy/capepy-2.2.0-py3-none-any.whl +./assets/lambda-layers/capepy/capepy-3.0.0-py3-none-any.whl boto3>=1.0.0,<2.0.0 boto3-stubs>=1.0.0,<2.0.0