diff --git a/onnxruntime/python/tools/quantization/base_quantizer.py b/onnxruntime/python/tools/quantization/base_quantizer.py index f07fb30f10f82..6235db3234d49 100644 --- a/onnxruntime/python/tools/quantization/base_quantizer.py +++ b/onnxruntime/python/tools/quantization/base_quantizer.py @@ -19,7 +19,9 @@ from .calibrate import TensorData from .onnx_model import ONNXModel from .quant_utils import ( + DEQUANT_OP_NAME, ONNX_TYPE_TO_NP_TYPE, + QUANT_OP_NAME, TENSOR_NAME_QUANT_SUFFIX, find_by_name, model_has_infer_metadata, @@ -178,6 +180,9 @@ def should_quantize_node(self, node): if node.op_type not in self.op_types_to_quantize: return False + if node.op_type in (DEQUANT_OP_NAME, QUANT_OP_NAME): + return False + if self.nodes_to_exclude is not None and node.name in self.nodes_to_exclude: return False diff --git a/onnxruntime/python/tools/quantization/qdq_quantizer.py b/onnxruntime/python/tools/quantization/qdq_quantizer.py index 048c7f3296503..5552a4451c542 100644 --- a/onnxruntime/python/tools/quantization/qdq_quantizer.py +++ b/onnxruntime/python/tools/quantization/qdq_quantizer.py @@ -195,7 +195,11 @@ def __init__( # The default behavior is that multiple nodes can share a QDQ pair as their inputs. # In TRT, QDQ pair can`t be shared between nodes, so it will create dedicated QDQ pairs for each node. self.dedicated_qdq_pair = extra_options.get("DedicatedQDQPair", False) - self.tensor_to_its_receiving_nodes = {} + self.tensor_to_its_receiving_nodes: dict[str, list[onnx.NodeProto]] = {} + + # Maps a tensor to the DequantizeLinear node (in the original input model) that outputs the tensor. + # Populated for input models with some pre-quantized weights (typically via a different tool). + self.tensor_to_producing_dq: dict[str, onnx.NodeProto] = {} # Let user set channel axis for specific op type and it's effective only when per channel quantization is supported and per_channel is True. self.qdq_op_type_per_channel_support_to_axis = extra_options.get("QDQOpTypePerChannelSupportToAxis", {}) @@ -555,6 +559,9 @@ def quantize_model(self): if tensor_name not in self.tensor_to_its_receiving_nodes: self.tensor_to_its_receiving_nodes[tensor_name] = [] self.tensor_to_its_receiving_nodes[tensor_name].append(node) + if node.op_type == DEQUANT_OP_NAME: + for tensor_name in node.output: + self.tensor_to_producing_dq[tensor_name] = node self.initializer_quant_params = self._calc_initializer_quant_params() self._adjust_weight_quant_params_for_bias_tensors() @@ -958,6 +965,14 @@ def _quantize_normal_tensors(self): if initializer: self._add_qdq_nodes_for_initializer(initializer) else: + # Check if this tensor is already a dequantized value. If so, skip it. + # This happens if the original input model already has some pre-quantized weights + # generated by a different tool. + # Ex: (quantized_weight -> DequantizeLinear -> this_tensor) + if tensor_name in self.tensor_to_producing_dq: + del self.tensors_to_quantize[tensor_name] + continue + tensor_qparam_initializers = self._make_tensor_scale_zp_initializers(tensor_name) if not tensor_qparam_initializers: raise ValueError( @@ -1009,6 +1024,12 @@ def _quantize_sharing_param_tensors(self): if self.is_input_a_initializer(tensor_name): raise ValueError("Quantization parameter shared mode is not supported for weight yet") + if tensor_name in self.tensor_to_producing_dq: + raise ValueError( + f"Quantization parameter sharing is invalid for tensor {tensor_name} " + "because it has already been quantized" + ) + # Need to check if this tensor's quant_type is converted for some consumers. # If so, create new scale/zp initializers for these consumers. converted_qparam_inits = None @@ -1147,6 +1168,30 @@ def is_tensor_per_channel( return True, axis + def _get_tensor_quantization_scale(self, tensor_name: str, consumer_node_name: str) -> np.ndarray | None: + """ + Returns the quantization scale of a tensor that is consumed by the given node. + :parameter tensor_name: The name of the tensor. + :parameter consumer_node_name: The name of the node that consumes the tensor as input. Necessary in case + the quantization type of the tensor was converted. + Refer: QDQQuantizer::_add_qdq_ops_for_converted_activation. + :returns: The quantization scale or None. + """ + initializers = self.model.initializer() + scale_initializer: onnx.TensorProto | None = None + + if tensor_name in self.quantized_value_map: + # Tensor was quantized by this tool, so get scale from initializer created by this tool run. + scale_name = self.quantized_value_map[tensor_name].get_for_consumer(consumer_node_name).scale_name + scale_initializer = find_by_name(scale_name, initializers) + else: + # Tensor was already quantized in original model, so get scale from DQ node that outputs the tensor. + dq_node = self.tensor_to_producing_dq.get(tensor_name, None) + if dq_node: + scale_initializer = find_by_name(dq_node.input[1], initializers) + + return tensor_proto_to_array(scale_initializer) if scale_initializer is not None else None + def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> str: """ Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale @@ -1156,17 +1201,21 @@ def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> s if bias_name in self.quantized_value_map: return self.quantized_value_map[bias_name].original.q_name - # get scale for weight - weight_scale_name = self.quantized_value_map[bias_info.weight_name].original.scale_name - weight_scale_initializer = find_by_name(weight_scale_name, self.model.initializer()) - weight_scale = tensor_proto_to_array(weight_scale_initializer) + # get scale for weight. + weight_scale = self._get_tensor_quantization_scale(bias_info.weight_name, bias_info.node_name) + if weight_scale is None: + raise ValueError( + f"Unable to get valid quantization scale for weight input '{bias_info.weight_name}' " + f"when quantizing bias '{bias_name}' to int32." + ) - # get scale for input - input_scale_name = ( - self.quantized_value_map[bias_info.input_name].get_for_consumer(bias_info.node_name).scale_name - ) - input_scale_initializer = find_by_name(input_scale_name, self.model.initializer()) - input_scale = tensor_proto_to_array(input_scale_initializer) + # get scale for input. + input_scale = self._get_tensor_quantization_scale(bias_info.input_name, bias_info.node_name) + if input_scale is None: + raise ValueError( + f"Unable to get valid quantization scale for input '{bias_info.input_name}' " + f"when quantizing bias '{bias_name}' to int32." + ) ( quantized_bias_name, diff --git a/onnxruntime/test/python/quantization/test_qdq.py b/onnxruntime/test/python/quantization/test_qdq.py index 24039fe7398a8..23b397ffd80e1 100644 --- a/onnxruntime/test/python/quantization/test_qdq.py +++ b/onnxruntime/test/python/quantization/test_qdq.py @@ -20,10 +20,12 @@ check_op_type_count, check_op_type_order, create_clip_node, + get_tensor_consumers_and_producers, ) from onnxruntime.quantization import QDQQuantizer, QuantFormat, QuantType, quantize_static, write_calibration_table from onnxruntime.quantization.calibrate import CalibrationMethod, TensorData, TensorsData +from onnxruntime.quantization.quant_utils import quantize_nparray class TestQDQFormat(unittest.TestCase): @@ -1925,5 +1927,280 @@ def test_dup_shared_bias(self): self.assertEqual(len(bias_names), 2) +class TestQDQPrequantWeights(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls._tmp_model_dir = tempfile.TemporaryDirectory(prefix="ort.qdq.prequant_weight") + + # Note: swap with the commented line if you want to see the models in local test dir. + cls._tmp_dir_path = cls._tmp_model_dir.name + # cls._tmp_dir_path = "." + + @classmethod + def tearDownClass(cls): + cls._tmp_model_dir.cleanup() + + def build_conv_model( + self, + inp_shape: list[int], + weight_quant_data: np.ndarray, + weight_scale_data: np.ndarray, + weight_zp_data: np.ndarray, + bias_data: np.ndarray, + float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT, + ): + """ + Builds a model with a Conv that has a pre-quantized constant weight input. + """ + input_0 = onnx.helper.make_tensor_value_info("input_0", float_type, inp_shape) + output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None) + weight_quant = onnx.numpy_helper.from_array(weight_quant_data, "weight_quant") + weight_scale = onnx.numpy_helper.from_array(weight_scale_data, "weight_scale") + weight_zp = onnx.numpy_helper.from_array(weight_zp_data, "weight_zp") + bias = onnx.numpy_helper.from_array(bias_data, "bias") + + dq_node = onnx.helper.make_node( + "DequantizeLinear", ["weight_quant", "weight_scale", "weight_zp"], ["weight_dequant"], name="DQ0" + ) + conv_node = onnx.helper.make_node("Conv", ["input_0", "weight_dequant", "bias"], ["output_0"], name="Conv0") + graph = onnx.helper.make_graph( + [dq_node, conv_node], + "ConvPreQuantWeight", + [input_0], + [output_0], + initializer=[weight_quant, weight_scale, weight_zp, bias], + ) + opset_imports = [onnx.helper.make_opsetid("", 21)] + model = onnx.helper.make_model(graph, opset_imports=opset_imports) + + return onnx.shape_inference.infer_shapes(model) + + def build_conv_dynamic_weight_model( + self, + input_quant_data: np.ndarray, + input_scale_data: np.ndarray, + input_zp_data: np.ndarray, + weight_shape: list[int], + bias_data: np.ndarray, + float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT, + ): + """ + Builds a model with a Conv that has a dynamic float weight input, but a constant + pre-quantized input[0]. + """ + dyn_weight = onnx.helper.make_tensor_value_info("dyn_weight", float_type, weight_shape) + output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None) + input_quant = onnx.numpy_helper.from_array(input_quant_data, "input_quant") + input_scale = onnx.numpy_helper.from_array(input_scale_data, "input_scale") + input_zp = onnx.numpy_helper.from_array(input_zp_data, "input_zp") + bias = onnx.numpy_helper.from_array(bias_data, "bias") + + dq_node = onnx.helper.make_node( + "DequantizeLinear", ["input_quant", "input_scale", "input_zp"], ["input_dequant"], name="DQ0" + ) + conv_node = onnx.helper.make_node("Conv", ["input_dequant", "dyn_weight", "bias"], ["output_0"], name="Conv0") + graph = onnx.helper.make_graph( + [dq_node, conv_node], + "ConvPreQuantInput_DynamicWeight", + [dyn_weight], + [output_0], + initializer=[input_quant, input_scale, input_zp, bias], + ) + opset_imports = [onnx.helper.make_opsetid("", 21)] + model = onnx.helper.make_model(graph, opset_imports=opset_imports) + + return onnx.shape_inference.infer_shapes(model) + + def test_quantize_with_prequantized_weights(self): + """ + Test quantization of Conv with pre-quantized weights. + """ + rng = np.random.default_rng(123) + test_configs = [onnx.TensorProto.FLOAT, onnx.TensorProto.FLOAT16] + + for float_type in test_configs: + with self.subTest(float_type=float_type): + label = f"_{onnx.TensorProto.DataType.Name(float_type)}" + float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_weight{label}.onnx") + qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_weight{label}.qdq.onnx") + + inp_shape = [1, 2, 100, 100] + weight_shape = [2, 2, 20, 20] + np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type) + + # range = 2.0, scale = 2/254, zp = 0 + weight_scale_data = np.array(2 / 254, dtype=np_dtype) + weight_zp_data = np.array(0, dtype=np.int8) + weight_data = np.linspace(-1.0, 1.0, num=1600, dtype=np_dtype).reshape(weight_shape) + weight_quant_data = quantize_nparray( + onnx.TensorProto.INT8, weight_data, weight_scale_data, weight_zp_data + ) + + bias_data = np.array([-10.0, 10.0], dtype=np_dtype) + float_model = self.build_conv_model( + inp_shape, weight_quant_data, weight_scale_data, weight_zp_data, bias_data, float_type + ) + + onnx.checker.check_model(float_model, True) + onnx.save_model(float_model, float_model_path) + + # Check that the input model only has a pre-quantized weight and save its scale/zero-point + # to check that it doesn't change after quantization. + float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1} + check_op_type_count(self, float_model_path, **float_node_counts) + conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None) + self.assertNotEqual(conv_node_original, None) + + _, producers_original = get_tensor_consumers_and_producers(float_model) + weight_dq_node_original = producers_original.get(conv_node_original.input[1], None) + initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer} + scale_name_original = weight_dq_node_original.input[1] + scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original]) + zp_name_original = weight_dq_node_original.input[2] + zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original]) + + input_data_list = [ + {"input_0": rng.uniform(-10.0, 10.0, inp_shape).astype(np_dtype)}, + ] + data_reader = TestDataFeeds(input_data_list) + + quantize_static( + float_model_path, + qdq_model_path, + data_reader, + quant_format=QuantFormat.QDQ, + activation_type=QuantType.QUInt8, + weight_type=QuantType.QInt8, + op_types_to_quantize=["Conv"], + ) + + # The final model should have everything quantized + qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4} + check_op_type_count(self, qdq_model_path, **qdq_node_counts) + + # Check that the pre-quantized weight still has the same scale/zp after quantization + qdq_model = onnx.load_model(qdq_model_path) + conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None) + self.assertNotEqual(conv_node, None) + + _, producers = get_tensor_consumers_and_producers(qdq_model) + weight_dq_node = producers.get(conv_node.input[1], None) + initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer} + + scale_name = weight_dq_node.input[1] + self.assertEqual(scale_name, scale_name_original) + scale_val = onnx.numpy_helper.to_array(initializers[scale_name]) + self.assertEqual(scale_val, scale_val_original) + + zp_name = weight_dq_node.input[2] + self.assertEqual(zp_name, zp_name_original) + zp_val = onnx.numpy_helper.to_array(initializers[zp_name]) + self.assertEqual(zp_val, zp_val_original) + + def test_quantize_with_prequantized_input(self): + """ + Test quantization of Conv with pre-quantized input and dynamic weight. + """ + rng = np.random.default_rng(123) + test_configs = [ + (onnx.TensorProto.FLOAT, False), + (onnx.TensorProto.FLOAT16, False), + (onnx.TensorProto.FLOAT, True), + (onnx.TensorProto.FLOAT16, True), + ] + + for float_type, convert_weight_qtype in test_configs: + with self.subTest(float_type=float_type): + convert_label = "_convert_qtype" if convert_weight_qtype else "" + label = f"_{onnx.TensorProto.DataType.Name(float_type)}{convert_label}" + float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_input{label}.onnx") + qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_input{label}.qdq.onnx") + + inp_shape = [1, 2, 40, 40] + weight_shape = [2, 2, 20, 20] + np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type) + + # range = 3.0, scale = 3/255, zp = 127 + input_scale_data = np.array(3 / 255, dtype=np_dtype) + input_zp_data = np.array(127, dtype=np.uint8) + input_data = np.linspace(-1.5, 1.5, num=3200, dtype=np_dtype).reshape(inp_shape) + input_quant_data = quantize_nparray(onnx.TensorProto.UINT8, input_data, input_scale_data, input_zp_data) + + bias_data = np.array([-10.0, 10.0], dtype=np_dtype) + float_model = self.build_conv_dynamic_weight_model( + input_quant_data, input_scale_data, input_zp_data, weight_shape, bias_data, float_type + ) + + onnx.checker.check_model(float_model, True) + onnx.save_model(float_model, float_model_path) + + # Check that the input model only has a pre-quantized input and save its scale/zero-point + # to check that it doesn't change after quantization. + float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1} + check_op_type_count(self, float_model_path, **float_node_counts) + conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None) + self.assertNotEqual(conv_node_original, None) + + _, producers_original = get_tensor_consumers_and_producers(float_model) + input_dq_node_original = producers_original.get(conv_node_original.input[0], None) + initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer} + scale_name_original = input_dq_node_original.input[1] + scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original]) + zp_name_original = input_dq_node_original.input[2] + zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original]) + + # Create data reader with random input calibration data. + dyn_weight_data_list = [ + {"dyn_weight": rng.uniform(-10.0, 10.0, weight_shape).astype(np_dtype)}, + ] + data_reader = TestDataFeeds(dyn_weight_data_list) + + extra_options = {} + if convert_weight_qtype: + # Test converting the dynamic weight's quantization type, which results in + # dyn_weight -> Q(u16) -> DQ(f32) -> Q(u8) -> DQ(f32) -> Conv + extra_options["TensorQuantOverrides"] = { + "dyn_weight": [{"quant_type": QuantType.QUInt16, "convert": {"quant_type": QuantType.QUInt8}}], + } + + quantize_static( + float_model_path, + qdq_model_path, + data_reader, + quant_format=QuantFormat.QDQ, + activation_type=QuantType.QUInt8, + weight_type=QuantType.QInt8, + op_types_to_quantize=["Conv"], + extra_options=extra_options, + ) + + # The final model should have everything quantized + qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4} + if convert_weight_qtype: + qdq_node_counts["QuantizeLinear"] += 1 + qdq_node_counts["DequantizeLinear"] += 1 + + check_op_type_count(self, qdq_model_path, **qdq_node_counts) + + # Check that the pre-quantized input still has the same scale/zp after quantization + qdq_model = onnx.load_model(qdq_model_path) + conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None) + self.assertNotEqual(conv_node, None) + + _, producers = get_tensor_consumers_and_producers(qdq_model) + input_dq_node = producers.get(conv_node.input[0], None) + initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer} + + scale_name = input_dq_node.input[1] + self.assertEqual(scale_name, scale_name_original) + scale_val = onnx.numpy_helper.to_array(initializers[scale_name]) + self.assertEqual(scale_val, scale_val_original) + + zp_name = input_dq_node.input[2] + self.assertEqual(zp_name, zp_name_original) + zp_val = onnx.numpy_helper.to_array(initializers[zp_name]) + self.assertEqual(zp_val, zp_val_original) + + if __name__ == "__main__": unittest.main()