Skip to content

Commit

Permalink
[AIRFLOW-2794] Add WasbDeleteBlobOperator (apache#3961)
Browse files Browse the repository at this point in the history
Deleting Azure blob is now supported. Either single blobs can be
deleted, or one can choose to supply a prefix, in which case one
can match multiple blobs to be deleted.
  • Loading branch information
bart-eijk authored and Alice Berard committed Jan 3, 2019
1 parent 8a4027f commit 5347289
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 1 deletion.
41 changes: 41 additions & 0 deletions airflow/contrib/hooks/wasb_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.
#

from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook

from azure.storage.blob import BlockBlobService
Expand Down Expand Up @@ -148,3 +149,43 @@ def read_file(self, container_name, blob_name, **kwargs):
return self.connection.get_blob_to_text(container_name,
blob_name,
**kwargs).content

def delete_file(self, container_name, blob_name, is_prefix=False,
ignore_if_missing=False, **kwargs):
"""
Delete a file from Azure Blob Storage.
:param container_name: Name of the container.
:type container_name: str
:param blob_name: Name of the blob.
:type blob_name: str
:param is_prefix: If blob_name is a prefix, delete all matching files
:type is_prefix: bool
:param ignore_if_missing: if True, then return success even if the
blob does not exist.
:type ignore_if_missing: bool
:param kwargs: Optional keyword arguments that
`BlockBlobService.create_blob_from_path()` takes.
:type kwargs: object
"""

if is_prefix:
blobs_to_delete = [
blob.name for blob in self.connection.list_blobs(
container_name, prefix=blob_name, **kwargs
)
]
elif self.check_for_blob(container_name, blob_name):
blobs_to_delete = [blob_name]
else:
blobs_to_delete = []

if not ignore_if_missing and len(blobs_to_delete) == 0:
raise AirflowException('Blob(s) not found: {}'.format(blob_name))

for blob_uri in blobs_to_delete:
self.log.info("Deleting blob: " + blob_uri)
self.connection.delete_blob(container_name,
blob_uri,
delete_snapshots='include',
**kwargs)
71 changes: 71 additions & 0 deletions airflow/contrib/operators/wasb_delete_blob_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class WasbDeleteBlobOperator(BaseOperator):
"""
Deletes blob(s) on Azure Blob Storage.
:param container_name: Name of the container. (templated)
:type container_name: str
:param blob_name: Name of the blob. (templated)
:type blob_name: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param check_options: Optional keyword arguments that
`WasbHook.check_for_blob()` takes.
:param is_prefix: If blob_name is a prefix, delete all files matching prefix.
:type is_prefix: bool
:param ignore_if_missing: if True, then return success even if the
blob does not exist.
:type ignore_if_missing: bool
"""

template_fields = ('container_name', 'blob_name')

@apply_defaults
def __init__(self, container_name, blob_name,
wasb_conn_id='wasb_default', check_options=None,
is_prefix=False, ignore_if_missing=False,
*args,
**kwargs):
super(WasbDeleteBlobOperator, self).__init__(*args, **kwargs)
if check_options is None:
check_options = {}
self.wasb_conn_id = wasb_conn_id
self.container_name = container_name
self.blob_name = blob_name
self.check_options = check_options
self.is_prefix = is_prefix
self.ignore_if_missing = ignore_if_missing

def execute(self, context):
self.log.info(
'Deleting blob: {self.blob_name}\n'
'in wasb://{self.container_name}'.format(**locals())
)
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)

hook.delete_file(self.container_name, self.blob_name,
self.is_prefix, self.ignore_if_missing,
**self.check_options)
56 changes: 55 additions & 1 deletion tests/contrib/hooks/test_wasb_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

import json
import unittest
from collections import namedtuple

from airflow import configuration
from airflow import configuration, AirflowException
from airflow import models
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.utils import db
Expand Down Expand Up @@ -143,6 +144,59 @@ def test_read_file(self, mock_service):
'container', 'blob', max_connections=1
)

@mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
autospec=True)
def test_delete_single_blob(self, mock_service):
mock_instance = mock_service.return_value
hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
hook.delete_file('container', 'blob', is_prefix=False)
mock_instance.delete_blob.assert_called_once_with(
'container', 'blob', delete_snapshots='include'
)

@mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
autospec=True)
def test_delete_multiple_blobs(self, mock_service):
mock_instance = mock_service.return_value
Blob = namedtuple('Blob', ['name'])
mock_instance.list_blobs.return_value = iter(
[Blob('blob_prefix/blob1'), Blob('blob_prefix/blob2')]
)
hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
hook.delete_file('container', 'blob_prefix', is_prefix=True)
mock_instance.delete_blob.assert_any_call(
'container', 'blob_prefix/blob1', delete_snapshots='include'
)
mock_instance.delete_blob.assert_any_call(
'container', 'blob_prefix/blob2', delete_snapshots='include'
)

@mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
autospec=True)
def test_delete_nonexisting_blob_fails(self, mock_service):
mock_instance = mock_service.return_value
mock_instance.exists.return_value = False
hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
with self.assertRaises(Exception) as context:
hook.delete_file(
'container', 'nonexisting_blob',
is_prefix=False, ignore_if_missing=False
)
self.assertIsInstance(context.exception, AirflowException)

@mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
autospec=True)
def test_delete_multiple_nonexisting_blobs_fails(self, mock_service):
mock_instance = mock_service.return_value
mock_instance.list_blobs.return_value = iter([])
hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
with self.assertRaises(Exception) as context:
hook.delete_file(
'container', 'nonexisting_blob_prefix',
is_prefix=True, ignore_if_missing=False
)
self.assertIsInstance(context.exception, AirflowException)


if __name__ == '__main__':
unittest.main()
91 changes: 91 additions & 0 deletions tests/contrib/operators/test_wasb_delete_blob_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import datetime
import unittest

from airflow import DAG, configuration
from airflow.contrib.operators.wasb_delete_blob_operator import WasbDeleteBlobOperator

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None


class TestWasbDeleteBlobOperator(unittest.TestCase):

_config = {
'container_name': 'container',
'blob_name': 'blob',
}

def setUp(self):
configuration.load_test_config()
args = {
'owner': 'airflow',
'start_date': datetime.datetime(2017, 1, 1)
}
self.dag = DAG('test_dag_id', default_args=args)

def test_init(self):
operator = WasbDeleteBlobOperator(
task_id='wasb_operator',
dag=self.dag,
**self._config
)
self.assertEqual(operator.container_name,
self._config['container_name'])
self.assertEqual(operator.blob_name, self._config['blob_name'])
self.assertEqual(operator.is_prefix, False)
self.assertEqual(operator.ignore_if_missing, False)

operator = WasbDeleteBlobOperator(
task_id='wasb_operator',
dag=self.dag,
is_prefix=True,
ignore_if_missing=True,
**self._config
)
self.assertEqual(operator.is_prefix, True)
self.assertEqual(operator.ignore_if_missing, True)

@mock.patch('airflow.contrib.operators.wasb_delete_blob_operator.WasbHook',
autospec=True)
def test_execute(self, mock_hook):
mock_instance = mock_hook.return_value
operator = WasbDeleteBlobOperator(
task_id='wasb_operator',
dag=self.dag,
is_prefix=True,
ignore_if_missing=True,
**self._config
)
operator.execute(None)
mock_instance.delete_file.assert_called_once_with(
'container', 'blob', True, True
)


if __name__ == '__main__':
unittest.main()

0 comments on commit 5347289

Please sign in to comment.