Apache Airflow is an open-source tool that allows you to create and monitor workflows. It is largely used for data pipelines, as it has integration with multiple services, including AWS, Spark Scala, Slack and more. It also offers an easy way to visualize and follow your workflow step-by-step through its user interface.
Airflow has grown a lot throughout the versions. However, there are still some vulnerabilities. One of these might expose sensitive data you wish was hidden.
Note: Images and code snippets presented in this article refer to Airflow version 2.3.3, but the solution works for previous versions as well. Airflow UI images are from a local Airflow instance.
In Airflow’s base code you will find features to mask sensitive data:
# A comma-separated list of extra sensitive keywords to look for in variables names or connection's # extra JSON. sensitive_var_conn_names =
There will be scenarios where you will want to run all kinds of jobs and applications. And for security reasons, the access to these tools is restricted through a password authentication system.
Let us say you want to run a spark job using AWS EMR to manage data in your Cassandra database. One approach is to use EmrAddStepsOperator and pass Cassandra’s password through the steps argument.
Note: the following code is just an example. Do not hard code your password. Use an Airflow variable for that.
AddEMRSteps = EmrAddStepsOperator( task_id="AddEMRSteps", job_flow_id="myFlowID", aws_conn_id="myAWSConnID", steps=[ { "Name": "emr_steps", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "spark-submit", "--deploy-mode", "cluster", "--master", "yarn", "--driver-memory", "8G", "--executor-memory", "10G", "--executor-core", "1", "--num-executors", "2", "--conf", "spark.cassandra.connection.host=myHost", "--conf", "spark.cassandra.connection.port=myPort", "--conf", "spark.cassandra.auth.username=myUsername", "--conf", "spark.cassandra.auth.password=myPassword123" } } ] )
But when you deploy your DAG and access the Task Instance Details page (click on you task, then Instance Details), this is what Airflow prints under Task Attributes.
Not only that, but if you access your task’s Rendered Template tab, your password will also be visible:
For any operator whose arguments are printed in these pages (e.g., EmrCreateJobFlowOperator, EmrAddStepsOperator, and even PythonOperator), all your passwords will be displayed in plain sight for anyone with access to that Airflow to see, as this page only requires the minimum access role, viewer.
The file responsible for displaying that information is Airflow/www/views.py. If you want to find this file in your machine/container, run the command
find / -name views.py |
The content of this file changes depending on what version of Airflow you are using. However, this solution works regardless of that.
from airflow.utils.log.secrets_masker import get_sensitive_variables_fields for sensitive_key in get_sensitive_variables_fields(): pattern = fr"((\"|\')[a-z0-9_\.]*{sensitive_key}[a-z0-9_\.]*(\"|\'|\=|:|\s)+(\"|\')?.*?(\"|\'))" for attr_name in dir(task): if not attr_name.startswith('_'): attr = getattr(task, attr_name) regex_matches = re.findall(pattern, str(attr), re.IGNORECASE) if regex_matches: for match in regex_matches: case_1 = re.findall(r":", match[0], re.IGNORECASE) if case_1: attr = re.sub(re.escape(match[0]), f"{match[0].split(':')[0]}:*******", str(attr)) else: attr = re.sub(re.escape(match[0]), f"{match[0].split('=')[0]}=*******'", str(attr)) try: setattr(task, attr_name, attr) except Exception as e: logging.error(f"Failed to mask password. Error:{e}")
from airflow.utils.log.secrets_masker import get_sensitive_variables_fields for template_field in task.template_fields: content = getattr(task, template_field) renderer = task.template_fields_renderers.get(template_field, template_field) for sensitive_key in get_sensitive_variables_fields(): pattern = fr"((\"|\')[a-z0-9_\.]*{sensitive_key}[a-z0-9_\.]*(\"|\'|\=|:|\s)+(\"|\')?.*?(\"|\'))" regex_matches = re.findall(pattern, str(content), re.IGNORECASE) if regex_matches: for match in regex_matches: case_1 = re.findall(r":", match[0], re.IGNORECASE) if case_1: content = re.sub(re.escape(match[0]), f"{match[0].split(':')[0]}:*******", str(content)) else: content = re.sub(re.escape(match[0]), f"{match[0].split('=')[0]}=*******'", str(content)) if renderer in renderers: if isinstance(content, (dict, list)): json_content = json.dumps(content, sort_keys=True, indent=4) html_dict[template_field] = renderers[renderer](json_content) else: html_dict[template_field] = renderers[renderer](content) else: html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(pformat(content)) if isinstance(content, dict): if template_field == 'op_kwargs': for key, value in content.items(): renderer = task.template_fields_renderers.get(key, key) if renderer in renderers: html_dict['.'.join([template_field, key])] = renderers[renderer](value) else: html_dict['.'.join([template_field, key])] = Markup( "<pre><code>{}</pre></code>" ).format(pformat(value)) else: for dict_keys in get_key_paths(content): template_path = '.'.join((template_field, dict_keys)) renderer = task.template_fields_renderers.get(template_path, template_path) if renderer in renderers: content_value = get_value_from_path(dict_keys, content) html_dict[template_path] = renderers[renderer](content_value)
Note: if you want to add more sensitive words besides “password”, change Airflow’s configuration file, showed in the first session.
The solution presented in this article is a better and safer approach then configuring each DAG individually. This way, you will not have to remember to mask your passwords every time you write a new DAG.
I will open a pull request to Airflow’s repository in order to have this issue fixed. This way you won’t have to configure each Airflow instance manually.