diff --git a/FlowCal/io.py b/FlowCal/io.py index 4d5cb2a..9326110 100644 --- a/FlowCal/io.py +++ b/FlowCal/io.py @@ -960,6 +960,7 @@ def __repr__(self): 'amplification_type', 'detector_voltage', 'amplifier_gain', + 'channel_labels', 'range', 'resolution']) @@ -1015,6 +1016,8 @@ class FCSData(np.ndarray): Get the detector voltage used for the specified channel(s). amplifier_gain Get the amplifier gain used for the specified channel(s). + channel_labels + Get the label of the specified channel(s). range Get the range of the specified channel(s). resolution @@ -1331,6 +1334,42 @@ def amplifier_gain(self, channels=None): else: return self._amplifier_gain[channels] + def channel_labels(self, channels=None): + """ + Get the label of the specified channel(s). + + The label for channel "n" is extracted from the $PnS + parameter, if available. + + Parameters + ---------- + channels : int, str, list of int, list of str + Channel(s) for which to get the label. If None, + return a list with the label of all channels, in the + order of ``FCSData.channels``. + + Return + ------ + str or list of str + The label of the specified channel(s). If no + information about the label is found for a channel, + return None. + + """ + # Check default + if channels is None: + channels = self._channels + + # Get numerical indices of channels + channels = self._name_to_index(channels) + + # Get the label of the specified channels + if hasattr(channels, '__iter__') \ + and not isinstance(channels, six.string_types): + return [self._channel_labels[ch] for ch in channels] + else: + return self._channel_labels[channels] + def range(self, channels=None): """ Get the range of the specified channel(s). @@ -1708,6 +1747,13 @@ def __new__(cls, infile): amplifier_gain.append(channel_amp_gain) amplifier_gain = tuple(amplifier_gain) + # Channel label: Stored in the keyword parameter $PnS for channel n. + channel_labels = [] + for i in range(1, num_channels + 1): + channel_label = fcs_file.text.get('$P{}S'.format(i), None) + channel_labels.append(channel_label) + channel_labels = tuple(channel_labels) + # Get data from fcs_file object, and change writeable flag. data = fcs_file.data data.flags.writeable = True @@ -1729,6 +1775,7 @@ def __new__(cls, infile): obj._amplification_type = amplification_type obj._detector_voltage = detector_voltage obj._amplifier_gain = amplifier_gain + obj._channel_labels = channel_labels obj._range = data_range obj._resolution = resolution @@ -1771,6 +1818,8 @@ def __array_finalize__(self, obj): self._detector_voltage = copy.deepcopy(obj._detector_voltage) if hasattr(obj, '_amplifier_gain'): self._amplifier_gain = copy.deepcopy(obj._amplifier_gain) + if hasattr(obj, '_channel_labels'): + self._channel_labels = copy.deepcopy(obj._channel_labels) if hasattr(obj, '_range'): self._range = copy.deepcopy(obj._range) if hasattr(obj, '_resolution'): @@ -1824,6 +1873,7 @@ def __reduce__(self): amplification_type = self._amplification_type, detector_voltage = self._detector_voltage, amplifier_gain = self._amplifier_gain, + channel_labels = self._channel_labels, range = self._range, resolution = self._resolution) @@ -1885,6 +1935,7 @@ def __setstate__(self, state): self._amplification_type = fcsdata_state.amplification_type self._detector_voltage = fcsdata_state.detector_voltage self._amplifier_gain = fcsdata_state.amplifier_gain + self._channel_labels = fcsdata_state.channel_labels self._range = fcsdata_state.range self._resolution = fcsdata_state.resolution @@ -2105,6 +2156,8 @@ def __getitem__(self, key): [new_arr._detector_voltage[kc] for kc in key_channel]) new_arr._amplifier_gain = tuple( [new_arr._amplifier_gain[kc] for kc in key_channel]) + new_arr._channel_labels = tuple( + [new_arr._channel_labels[kc] for kc in key_channel]) new_arr._range = \ [new_arr._range[kc] for kc in key_channel] new_arr._resolution = tuple(\ @@ -2117,6 +2170,8 @@ def __getitem__(self, key): new_arr._detector_voltage[key_channel] new_arr._amplifier_gain = \ new_arr._amplifier_gain[key_channel] + new_arr._channel_labels = \ + new_arr._channel_labels[key_channel] new_arr._range = \ new_arr._range[key_channel] new_arr._resolution = \ @@ -2129,6 +2184,8 @@ def __getitem__(self, key): tuple([new_arr._detector_voltage[key_channel]]) new_arr._amplifier_gain = \ tuple([new_arr._amplifier_gain[key_channel]]) + new_arr._channel_labels = \ + tuple([new_arr._channel_labels[key_channel]]) new_arr._range = \ [new_arr._range[key_channel]] new_arr._resolution = \ diff --git a/test/Data005.fcs b/test/Data005.fcs new file mode 100644 index 0000000..13d6f5d Binary files /dev/null and b/test/Data005.fcs differ diff --git a/test/test_io.py b/test/test_io.py index 32a2153..b015191 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -24,11 +24,13 @@ - Data003.fcs: FCS 3.0 from FlowJo Collectors Edition 7.5 / BD FACScan Flow Cytometer - Data004.fcs: FCS 3.0 including floating-point data + - Data005.fcs: FCS 3.0 including $PnS value """ filenames = ['test/Data001.fcs', 'test/Data002.fcs', 'test/Data003.fcs', 'test/Data004.fcs', + 'test/Data005.fcs', ] class TestFCSDataLoading(unittest.TestCase): @@ -2301,6 +2303,146 @@ def test_nondefault_nbins_many_5(self): scale='linear'), [bins1, np.arange(262145) - 0.5, bins2]) +class TestFCSAttributesChannelLabels(unittest.TestCase): + """ + Test correct extraction, functioning, and slicing of channel label. + + We have previously looked at the contents of the $PnS attribute for + the test files and identified the correct label(marker): + - Data001.fcs: [None, None, None, None, None, None] + - Data002.fcs: [None, None, None, None, None, None, None, None, None] + - Data003.fcs: [None, None, None, None, None, None, None, None] + - Data004.fcs: [None, None, None, None, None, None, None, + None, None, None, None, None, None, None] + - Data005.fcs: ['209Bi_CD11b', '140Ce', '142Ce', 'Center', '161Dy_CD56', + '162Dy_gdTCR', '163Dy_CRTH2', '164Dy_CLEC12A', '166Er_CD25', + '167Er_CCR7', '168Er_CD3', '170Er_CD38', '151Eu_CD123', + '153Eu_PD-1', 'Event_length', '155Gd_CD27', '156Gd_CCR5', + '157Gd', '158Gd_CD117', '160Gd_CD14', '165Ho_CCR6', '127I', + '113In_CD57', '115In_CD11c', '191Ir', '193Ir', '175Lu', '176Lu', + '93Nb', '142Nd_CD19', '143Nd_CD45RA', '144Nd_CD141', '145Nd_CD4', + '146Nd_CD8', '148Nd_CD16', '150Nd_CD1c', 'Offset', '192Os', + '206Pb', '102Pd', '104Pd', '105Pd', '106Pd', '108Pd', '110Pd', + '141Pr_CD33', '194Pt', '195Pt', '196Pt', '198Pt', 'Residual', + '103Rh', '147Sm_CD20', '149Sm_CD127', '152Sm_CD66b', '154Sm_CD86', + '181Ta', '159Tb_CD24', '169Tm_CX3CR1', '131Xe', '89Y_CD45', + '171Yb_CD161', '172Yb_CD209', '173Yb_CXCR3', '174Yb_HLADR', + '176Yb_CCR4', None, None, 'Time']) + + """ + def setUp(self): + self.d = [FlowCal.io.FCSData(filenames[i]) for i in range(5)] + + + def test_attribute(self): + """ + Testing correct reporting of channel_labels. + + """ + self.assertListEqual( + self.d[0].channel_labels(), + [None, None, None, None, None, 'Time (204.80 sec.)']) + self.assertListEqual( + self.d[1].channel_labels(), + [None, None, None, None, None, None, None, None, None]) + self.assertListEqual( + self.d[2].channel_labels(), + [None, None, None, None, None, None, None, None]) + self.assertListEqual( + self.d[3].channel_labels(), + [None, None, None, None, None, None, None, + None, None, None, None, None, None, None]) + self.assertListEqual( + self.d[4].channel_labels(), + ['209Bi_CD11b', '140Ce', '142Ce', 'Center', '161Dy_CD56', + '162Dy_gdTCR', '163Dy_CRTH2', '164Dy_CLEC12A', '166Er_CD25', + '167Er_CCR7', '168Er_CD3', '170Er_CD38', '151Eu_CD123', + '153Eu_PD-1', 'Event_length', '155Gd_CD27', '156Gd_CCR5', + '157Gd', '158Gd_CD117', '160Gd_CD14', '165Ho_CCR6', '127I', + '113In_CD57', '115In_CD11c', '191Ir', '193Ir', '175Lu', '176Lu', + '93Nb', '142Nd_CD19', '143Nd_CD45RA', '144Nd_CD141', '145Nd_CD4', + '146Nd_CD8', '148Nd_CD16', '150Nd_CD1c', 'Offset', '192Os', + '206Pb', '102Pd', '104Pd', '105Pd', '106Pd', '108Pd', '110Pd', + '141Pr_CD33', '194Pt', '195Pt', '196Pt', '198Pt', 'Residual', + '103Rh', '147Sm_CD20', '149Sm_CD127', '152Sm_CD66b', '154Sm_CD86', + '181Ta', '159Tb_CD24', '169Tm_CX3CR1', '131Xe', '89Y_CD45', + '171Yb_CD161', '172Yb_CD209', '173Yb_CXCR3', '174Yb_HLADR', + '176Yb_CCR4', None, None, 'Time']) + + def test_attribute_single(self): + """ + Testing correct reporting of label_channels for a single channel. + + """ + self.assertEqual(self.d[0].channel_labels('FSC-H'), None) + self.assertEqual(self.d[1].channel_labels('FITC-A'), None) + self.assertEqual(self.d[2].channel_labels('SSC'), None) + self.assertEqual(self.d[3].channel_labels('GFP-A'), None) + self.assertEqual(self.d[4].channel_labels('Bi209Di'), '209Bi_CD11b') + + def test_attribute_many(self): + """ + Testing correct reporting of channel_labels for many channels. + + """ + self.assertListEqual( + self.d[0].channel_labels(['SSC-H', 'FL2-H', 'FL3-H']), + [None, None, None]) + self.assertListEqual( + self.d[1].channel_labels(['FITC-A', 'PE-A', 'PE-Cy7-A']), + [None, None, None]) + self.assertListEqual( + self.d[2].channel_labels(['FSC', 'SSC', 'TIME']), + [None, None, None]) + self.assertListEqual( + self.d[3].channel_labels(['FSC PMT-A', 'FSC PMT-H', 'FSC PMT-W']), + [None, None, None]) + self.assertListEqual( + self.d[4].channel_labels(['Bi209Di', 'Ce140Di', 'Ce142Di']), + ['209Bi_CD11b', '140Ce', '142Ce']) + + def test_slice_single_str(self): + """ + Testing correct reporting of channel_labels after slicing. + + """ + self.assertListEqual( + self.d[0][:, 'FSC-H'].channel_labels(), + [None,]) + self.assertListEqual( + self.d[1][:, 'FITC-A'].channel_labels(), + [None,]) + self.assertListEqual( + self.d[2][:, 'SSC'].channel_labels(), + [None,]) + self.assertListEqual( + self.d[3][:, 'GFP-A'].channel_labels(), + [None,]) + self.assertListEqual( + self.d[4][:, 'Bi209Di'].channel_labels(), + ['209Bi_CD11b',]) + + def test_slice_many_str(self): + """ + Testing correct reporting of channel_labels after slicing. + + """ + self.assertListEqual( + self.d[0][:, ['SSC-H', 'FL2-H', 'FL3-H']].channel_labels(), + [None, None, None]) + self.assertListEqual( + self.d[1][:, ['FITC-A', 'PE-A', 'PE-Cy7-A']].channel_labels(), + [None, None, None]) + self.assertListEqual( + self.d[2][:, ['FSC', 'SSC', 'TIME']].channel_labels(), + [None, None, None]) + self.assertListEqual( + self.d[3][:,['FSC PMT-A', 'FSC PMT-H', 'FSC PMT-W']].channel_labels(), + [None, None, None]) + self.assertListEqual( + self.d[4][:,['Bi209Di', 'Ce140Di', 'Ce142Di']].channel_labels(), + ['209Bi_CD11b', '140Ce', '142Ce']) + class TestFCSAttributes(unittest.TestCase): def setUp(self): self.d = FlowCal.io.FCSData(filenames[0]) @@ -2789,10 +2931,21 @@ def test_pickle_unpickle(self): loaded_attrs = set(dir(test_fcs)) self.assertEqual(attrs, loaded_attrs) # Check contents of attrs affected by pickling - pickle_sensitive_attrs = ['_resolution', '_range', '_amplifier_gain', - '_detector_voltage', '_amplification_type', '_channels', - '_acquisition_end_time', '_acquisition_start_time', '_time_step', - '_data_type', '_analysis', '_text', '_infile'] + pickle_sensitive_attrs = [ + '_resolution', + '_range', + '_amplifier_gain', + '_channel_labels', + '_detector_voltage', + '_amplification_type', + '_channels', + '_acquisition_end_time', + '_acquisition_start_time', + '_time_step', + '_data_type', + '_analysis', + '_text', + '_infile'] # Also test computed (property) attrs computed_attrs = [ 'infile', @@ -2810,6 +2963,7 @@ def test_pickle_unpickle(self): 'amplification_type', 'detector_voltage', 'amplifier_gain', + 'channel_labels', 'range', 'resolution', 'hist_bins',