Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10601] DICOM API Beam IO connector #12331

Merged
merged 44 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2fa1e78
First commit, after modifying codes based on design doc feedbacks 7/20
George-Wu Jul 21, 2020
816c63a
fix some comments
George-Wu Jul 21, 2020
66fcda5
Merge pull request #1 from George-Wu/working
George-Wu Jul 21, 2020
29f7b02
fix style and add license
George-Wu Jul 21, 2020
7401988
Merge pull request #2 from George-Wu/working
George-Wu Jul 21, 2020
e5d825a
fix style lint
George-Wu Jul 21, 2020
e32bf7e
Merge pull request #3 from George-Wu/working
George-Wu Jul 21, 2020
fae3482
minor fix
George-Wu Jul 22, 2020
0de3c2c
add pagination support
George-Wu Jul 22, 2020
30e8ca1
add file path support to storeinstance
George-Wu Jul 23, 2020
a0e1bcb
Merge pull request #4 from George-Wu/work2
George-Wu Jul 23, 2020
5ba1a9b
fix some typos
George-Wu Jul 23, 2020
8986c5a
Merge pull request #5 from George-Wu/work2
George-Wu Jul 23, 2020
432480b
removed path support and added fileio supports
George-Wu Jul 24, 2020
9289cf5
Merge pull request #6 from George-Wu/work2
George-Wu Jul 24, 2020
192ac8d
fix bug in client
George-Wu Jul 24, 2020
045669b
add unit tests
George-Wu Jul 27, 2020
46ff235
Merge pull request #7 from George-Wu/work2
George-Wu Jul 27, 2020
60f96aa
Update dicomio_test.py
George-Wu Jul 27, 2020
a1621cb
fix patching
George-Wu Jul 27, 2020
05d1ee2
remove non-Non-ASCII character
George-Wu Jul 27, 2020
803cfcb
add google.auth support and fix client
George-Wu Jul 27, 2020
122383d
try inject dependency
George-Wu Jul 28, 2020
b1f8e9e
roll back injection
George-Wu Jul 28, 2020
fb42e31
add dependency
George-Wu Jul 28, 2020
abf7600
change place to inject
George-Wu Jul 28, 2020
5d664eb
change the order
George-Wu Jul 28, 2020
04d37cd
fix typos and pydocs
George-Wu Jul 29, 2020
4dde591
Merge branch 'master' of github.com:apache/beam into master
George-Wu Jul 29, 2020
fb9bbd4
fix style
George-Wu Jul 29, 2020
3690b10
fix annoying style
George-Wu Jul 29, 2020
21fd738
Add concurrent support
George-Wu Jul 30, 2020
51294ea
fixed bugs and docs style, added custom client supports, timestamp re…
George-Wu Jul 31, 2020
73ffeb9
fix py2 support issues
George-Wu Jul 31, 2020
593692d
fix some minor bugs
George-Wu Jul 31, 2020
ad6c49e
fix style and modify tests
George-Wu Jul 31, 2020
a19ecd1
fix format
George-Wu Jul 31, 2020
a2706dd
fix test skip
George-Wu Jul 31, 2020
b5a018b
Merge branch 'master' into master
George-Wu Aug 3, 2020
37375db
Update sdks/python/apache_beam/io/gcp/dicomio.py
George-Wu Aug 3, 2020
91d9516
Update sdks/python/apache_beam/io/gcp/dicomio.py
George-Wu Aug 3, 2020
ff2fc3c
Update sdks/python/apache_beam/io/gcp/dicomio.py
George-Wu Aug 3, 2020
ff07e98
Update sdks/python/apache_beam/io/gcp/dicomio.py
George-Wu Aug 3, 2020
0ccc2c5
function name change
George-Wu Aug 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 94 additions & 82 deletions sdks/python/apache_beam/io/gcp/dicomio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
# limitations under the License.
#

"""DICOM io connector
This module implements serval tools to facilitate the interaction between
a Google Cloud Healthcare DICOM store and a beam pipeline.
"""DICOM IO connector
This module implements several tools to facilitate the interaction between
a Google Cloud Healthcare DICOM store and a Beam pipeline.
For more details on DICOM store and API:
George-Wu marked this conversation as resolved.
Show resolved Hide resolved
https://cloud.google.com/healthcare/docs/how-tos/dicom
DICOM io connector can be used to search metadata or store DICOM files.
When used together with Google Pubsub message connector, a PTransform
implemented in this module can be used to convert pubsub messages to search
requests. Since Traceability is crucial for healthcare API users, every
input or error message will be recorded in the output of the DICOM io
connector. As a result, every PTransform in this module will return a
Pcollection of dict that encodes results and detailed error messages.
The DICOM IO connector can be used to search metadata or write DICOM files
George-Wu marked this conversation as resolved.
Show resolved Hide resolved
to DICOM store. When used together with Google Pubsub message connector,
a PTransform implemented in this module can be used to convert pubsub
messages to search requests. Since Traceability is crucial for healthcare
API users, every input or error message will be recorded in the output of
the DICOM IO connector. As a result, every PTransform in this module will
return a PCollection of dict that encodes results and detailed error messages.
George-Wu marked this conversation as resolved.
Show resolved Hide resolved

Search instance's metadata (QIDO request)
===================================================
Expand All @@ -36,15 +36,16 @@
the 'params' entry. Here is a sample usage:

with Pipeline() as p:
input_dict = p | beam.Create([
{'project_id': 'abc123', 'type': 'instances',...},
{'project_id': 'dicom_go', 'type': 'series',...}
])
results = input_dict| io.gcp.DicomSearch()
input_dict = p | beam.Create(
[{'project_id': 'abc123', 'type': 'instances',...},
{'project_id': 'dicom_go', 'type': 'series',...}])

results = input_dict | io.gcp.DicomSearch()
results | 'print successful search' >> beam.Map(
lambda x: print(x['result'] if x['success'] else None))
lambda x: print(x['result'] if x['success'] else None))

results | 'print failed search' >> beam.Map(
lambda x: print(x['result'] if not x['success'] else None))
lambda x: print(x['result'] if not x['success'] else None))

In the example above, successful qido search results and error messages for
failed requests are printed. When used in real life, user can choose to filter
Expand All @@ -60,12 +61,12 @@

pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
pubsub = p | beam.io.ReadStringFromPubsub(subscription='a_dicom_store')
results = pubsub | PubsubToQido()
success = results | 'filter message' >> beam.Filter(lambda x: x['success'])
qido_dict = success | 'get qido request' >> beam.Map(lambda x: x['result'])
metadata = qido_dict | DicomSearch()
p = beam.Pipeline(options=pipeline_options)
pubsub = p | beam.io.ReadStringFromPubsub(subscription='a_dicom_store')
results = pubsub | PubsubToQido()
success = results | 'filter message' >> beam.Filter(lambda x: x['success'])
qido_dict = success | 'get qido request' >> beam.Map(lambda x: x['result'])
metadata = qido_dict | DicomSearch()

In the example above, the pipeline is listening to a pubsub topic and waiting
for messages from DICOM API. When a new DICOM file comes into the storage, the
Expand Down Expand Up @@ -114,55 +115,60 @@

class DicomSearch(PTransform):
"""A PTransform used for retrieving DICOM instance metadata from Google
Cloud DICOM store. It takes a Pcollection of dicts as input and return
a Pcollection of dict as results:
Cloud DICOM store. It takes a PCollection of dicts as input and return
a PCollection of dict as results:
INPUT:
The input dict represents DICOM web path parameters, which has the following
string keys and values:
{
'project_id': str,
'region': str,
'dataset_id': str,
'dicom_store_id': str,
'search_type': str,
'params': dict(str,str) (Optional),
'project_id': str,
'region': str,
'dataset_id': str,
'dicom_store_id': str,
'search_type': str,
'params': dict(str,str) (Optional),
}

Key-value pairs:
project_id: Id of the project in which DICOM store locates. (Required)
project_id: Id of the project in which the DICOM store is
located. (Required)
region: Region where the DICOM store resides. (Required)
dataset_id: Id of the dataset where DICOM store belongs to. (Required)
dicom_store_id: Id of the dicom store. (Required)
search_type: Which type of search it is, could only be one of the three
values: 'instances', 'series', or 'studies'. (Required)
values: 'instances', 'series', or 'studies'. (Required)
params: A dict of str:str pairs used to refine QIDO search. (Optional)
Supported tags in three categories:
1. Studies:
StudyInstanceUID
PatientName
PatientID
AccessionNumber
ReferringPhysicianName
StudyDate
2. Series: all study level search terms and
SeriesInstanceUID
Modality
3. Instances: all study/series level search terms and
SOPInstanceUID
e.g. {"StudyInstanceUID":"1","SeriesInstanceUID":"2"}
Supported tags in three categories:
1.Studies:
StudyInstanceUID,
PatientName,
PatientID,
AccessionNumber,
ReferringPhysicianName,
StudyDate,
2.Series: all study level search terms and
SeriesInstanceUID,
Modality,
3.Instances: all study/series level search terms and
SOPInstanceUID,
George-Wu marked this conversation as resolved.
Show resolved Hide resolved

e.g. {"StudyInstanceUID":"1","SeriesInstanceUID":"2"}

OUTPUT:
The output dict wraps results as well as error messages:
{
'result': a list of dicts in JSON style.
'success': boolean value telling whether the operation is successful.
'input': detail ids and dicomweb path for this retrieval.
'status': status code from the server, used as error message.
'result': a list of dicts in JSON style.
'success': boolean value telling whether the operation is successful.
'input': detail ids and dicomweb path for this retrieval.
'status': status code from the server, used as error message.
}
pabloem marked this conversation as resolved.
Show resolved Hide resolved

"""
def __init__(self, credential=None):
"""Initializes DicomSearch.
Args:
credential: # type: Google credential object, if it is specified, the
Http client will use it to create sessions instead of the default.
Http client will use it to create sessions instead of the default.
"""
self.credential = credential

Expand Down Expand Up @@ -226,27 +232,29 @@ def process(self, element):

class PubsubToQido(PTransform):
"""A PTransform for converting pubsub messages into search input dict.
Takes Pcollection of string as input and returns a Pcollection of dict as
Takes PCollection of string as input and returns a PCollection of dict as
results. Note that some pubsub messages may not be from DICOM API, which
will be recorded as failed conversions.
INPUT:
The input are normally strings from Pubsub topic:
"projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/
dicomStores/DICOM_STORE_ID/dicomWeb/studies/STUDY_UID/
series/SERIES_UID/instances/INSTANCE_UID"
"projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/
dicomStores/DICOM_STORE_ID/dicomWeb/studies/STUDY_UID/
series/SERIES_UID/instances/INSTANCE_UID"

OUTPUT:
The output dict encodes results as well as error messages:
{
'result': a dict representing instance level qido search request.
'success': boolean value telling whether the conversion is successful.
'input': input pubsub message string.
'result': a dict representing instance level qido search request.
'success': boolean value telling whether the conversion is successful.
'input': input pubsub message string.
}

"""
def __init__(self, credential=None):
"""Initializes PubsubToQido.
Args:
credential: # type: Google credential object, if it is specified, the
Http client will use it instead of the default one.
Http client will use it instead of the default one.
"""
self.credential = credential

Expand Down Expand Up @@ -324,41 +332,45 @@ def process(self, element):
return [out]


class DicomStoreInstance(PTransform):
class WriteToDicomStore(PTransform):
"""A PTransform for storing instances to a DICOM store.
Takes Pcollection of byte[] as input and return a Pcollection of dict as
Takes PCollection of byte[] as input and return a PCollection of dict as
results. The inputs are normally DICOM file in bytes or str filename.
INPUT:
This PTransform supports two types of input:
1. Byte[]: representing dicom file.
2. Fileio object: stream file object.
This PTransform supports two types of input:
1. Byte[]: representing dicom file.
2. Fileio object: stream file object.

OUTPUT:
The output dict encodes status as well as error messages:
{
'success': boolean value telling whether the store is successful
'input': undeliverable data. Exactly the same as the input,
only set if the operation is failed.
'status': status code from the server, used as error messages.
'success': boolean value telling whether the store is successful.
'input': undeliverable data. Exactly the same as the input,
only set if the operation is failed.
'status': status code from the server, used as error messages.
}

"""
def __init__(self, destination_dict, input_type, credential=None):
"""Initializes DicomStoreInstance.
"""Initializes WriteToDicomStore.
Args:
destination_dict: # type: python dict, encodes DICOM endpoint information:
{
'project_id': str,
'region': str,
'dataset_id': str,
'dicom_store_id': str,
}
Key-value pairs:
project_id: Id of the project in which DICOM store locates. (Required)
region: Region where the DICOM store resides. (Required)
dataset_id: Id of the dataset where DICOM store belongs to. (Required)
dicom_store_id: Id of the dicom store. (Required)
{
'project_id': str,
'region': str,
'dataset_id': str,
'dicom_store_id': str,
}

Key-value pairs:
project_id: Id of the project in which DICOM store locates. (Required)
region: Region where the DICOM store resides. (Required)
dataset_id: Id of the dataset where DICOM store belongs to. (Required)
dicom_store_id: Id of the dicom store. (Required)
George-Wu marked this conversation as resolved.
Show resolved Hide resolved

input_type: # type: string, could only be 'bytes' or 'fileio'
credential: # type: Google credential object, if it is specified, the
Http client will use it instead of the default one.
Http client will use it instead of the default one.
"""
self.credential = credential
self.destination_dict = destination_dict
Expand Down
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/io/gcp/dicomio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp.dicomio import DicomSearch
from apache_beam.io.gcp.dicomio import DicomStoreInstance
from apache_beam.io.gcp.dicomio import WriteToDicomStore
from apache_beam.io.gcp.dicomio import PubsubToQido
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
Expand Down Expand Up @@ -289,7 +289,7 @@ def test_store_byte_file(self, FakeClient):
results = (
p
| beam.Create([bytes_input])
| DicomStoreInstance(input_dict, 'bytes')
| WriteToDicomStore(input_dict, 'bytes')
| beam.Map(lambda x: x['success']))
assert_that(results, equal_to([True]))
self.assertTrue(dict_input in fc.dicom_metadata)
Expand All @@ -316,7 +316,7 @@ def test_store_fileio_file(self, FakeClient):
| beam.Create([FileSystems.join(temp_dir, '*')])
| fileio.MatchAll()
| fileio.ReadMatches()
| DicomStoreInstance(input_dict, 'fileio')
| WriteToDicomStore(input_dict, 'fileio')
| beam.Map(lambda x: x['success']))
assert_that(results, equal_to([True]))
self.assertTrue(dict_input in fc.dicom_metadata)
Expand All @@ -341,7 +341,7 @@ def test_destination_notfound(self, FakeClient):
results = (
p
| beam.Create([''])
| DicomStoreInstance(input_dict, 'bytes'))
| WriteToDicomStore(input_dict, 'bytes'))
assert_that(results, equal_to([expected_invalid_dict]))

@patch("apache_beam.io.gcp.dicomio.DicomApiHttpClient")
Expand All @@ -361,7 +361,7 @@ def test_missing_parameters(self, FakeClient):
with self.assertRaisesRegex(ValueError,
"Must have dataset_id in the dict."):
p = TestPipeline()
_ = (p | beam.Create(['']) | DicomStoreInstance(input_dict, 'bytes'))
_ = (p | beam.Create(['']) | WriteToDicomStore(input_dict, 'bytes'))


if __name__ == '__main__':
Expand Down