Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkSubmitHook Kerberos ccache support #11246

Closed
jaketf opened this issue Oct 2, 2020 · 2 comments · Fixed by #34386
Closed

SparkSubmitHook Kerberos ccache support #11246

jaketf opened this issue Oct 2, 2020 · 2 comments · Fixed by #34386

Comments

@jaketf
Copy link
Contributor

jaketf commented Oct 2, 2020

Description

It appears that SparkSubmitHook's implementation of kerberos support is orthogonal to the airflow kerberos support. It requires us to specify a keytab in the task which will not work for us because our worker container intentionally does not have a volume mount for the keytab. This means we will have to modify SparkSubmit[Hook/Operator] to use ticket cache.

Use case / motivation

I want to use spark submit operator natively w/ ccache.

**Proposed Changes **

I think this should be an addition of "use_krb5ccache" variable to hook and operator that defaults to 'False' for backwards compatibility.
When it is True we should add the following to the spark submit command construction:

if self.use_krb5ccache:
    if not os.getenv('KRB5CCNAME'):
        raise AirflowException("KRB5CCNAME environment variable not set while trying to us ticket from ccache.")
    connection_cmd += [
         "--conf",
         "spark.kerberos.renewal.credentials=ccache"
     ]

We should also fall back to use principal from security kerberos config if not specified in the task.

   self._principal = principal if principal else conf.get('kerberos', 'principal')

Note I've tested something similar via cluster policy as a workaround in my current project:

def spark_tasks_use_ccache(task: BaseOperator):
    """Configure SparkSubmitOperator tasks to use kerberos ticket cache."""
    if isinstance(task, SparkSubmitOperator):
        # pylint: disable=protected-access
        if task._conf:  # noqa
            task._conf["spark.kerberos.renewal.credentials"] = "ccache"  # noqa
        task._principal = conf.get('kerberos', 'principal')

cc: @mik-laj @potiuk WDYT about contributing this sort of thing back to airflow? should we change this in the hook? would it be interesting to contribute useful example cluster policies to airflow?

@zeotuan
Copy link
Contributor

zeotuan commented Sep 7, 2023

I can work on this issue.

@potiuk
Copy link
Member

potiuk commented Sep 7, 2023

Feel free

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants