diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index e37d6ebe693..a9e6566c632 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -17213,9 +17213,9 @@ dates or timestamps, or for a lack of type coercion support.
 <td> </td>
 <td><b>NS</b></td>
 <td> </td>
+<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
 <td><b>NS</b></td>
-<td><b>NS</b></td>
-<td><b>NS</b></td>
+<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
 <td><b>NS</b></td>
 </tr>
 <tr>
diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py
index c155ba955dc..7da779c4143 100644
--- a/integration_tests/src/main/python/orc_write_test.py
+++ b/integration_tests/src/main/python/orc_write_test.py
@@ -20,10 +20,24 @@
 from marks import *
 from pyspark.sql.types import *
 
-orc_write_gens_list = [
-        [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
-            string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
-            TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))],
+orc_write_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
+        string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
+        TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]
+
+orc_write_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_write_basic_gens)])
+
+orc_write_struct_gens_sample = [orc_write_basic_struct_gen,
+    StructGen([['child0', byte_gen], ['child1', orc_write_basic_struct_gen]]),
+    StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]
+
+orc_write_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_write_basic_gens] + [
+    ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
+    ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
+    ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]
+
+orc_write_gens_list = [orc_write_basic_gens,
+        orc_write_struct_gens_sample,
+        orc_write_array_gens_sample,
         pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')),
         pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))]
 
diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala
index 9d6c92a264a..613c7fcf290 100644
--- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala
+++ b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetCachedBatchSerializer.scala
@@ -435,8 +435,8 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer with Arm {
       table: Table,
       schema: StructType): ParquetBufferConsumer = {
     val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
-    val opts = GpuParquetFileFormat
-        .parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false)
+    val opts = SchemaUtils
+        .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false)
         .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build()
     withResource(Table.writeParquetChunked(opts, buffer)) { writer =>
       writer.write(table)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
index 0f676203be8..33c29a6673d 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
@@ -21,11 +21,8 @@ import java.util.Random
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import ai.rapids.cudf.{
-  ColumnView, CompressionType, DType, HostBufferConsumer, HostMemoryBuffer,
-  ParquetColumnWriterOptions, ParquetWriterOptions, Table, TableWriter
-}
-import ai.rapids.cudf.ParquetColumnWriterOptions.{listBuilder, structBuilder, NestedBuilder}
+import ai.rapids.cudf._
+import ai.rapids.cudf.ColumnWriterOptions._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -144,8 +141,8 @@ private class ColumnIndex() {
 object ParquetDumper extends Arm {
   val COMPRESS_TYPE = CompressionType.SNAPPY
 
-  def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
-      builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
+  def parquetWriterOptionsFromTable[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
+      builder: ColumnWriterOptions.NestedBuilder[T, V],
       table: Table): T = {
 
     val cIndex = new ColumnIndex
@@ -159,8 +156,8 @@ object ParquetDumper extends Arm {
   }
 
   private def parquetWriterOptionsFromColumnView[T <: NestedBuilder[_, _],
-    V <: ParquetColumnWriterOptions](
-      builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
+    V <: ColumnWriterOptions](
+      builder: ColumnWriterOptions.NestedBuilder[T, V],
       cv: ColumnView,
       cIndex: ColumnIndex,
       toClose: ArrayBuffer[ColumnView]): T = {
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 8fc7c86e98b..8dd0c323f5b 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -852,7 +852,8 @@ object GpuOverrides extends Logging {
     (OrcFormatType, FileFormatChecks(
       cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.DECIMAL_64 +
           TypeSig.STRUCT + TypeSig.MAP).nested(),
-      cudfWrite = TypeSig.commonCudfTypes,
+      cudfWrite = (TypeSig.commonCudfTypes + TypeSig.ARRAY +
+          TypeSig.STRUCT).nested(),
       sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
           TypeSig.UDT).nested())))
 
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala
index 89a305fa1d4..d679af86ea7 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala
@@ -17,7 +17,6 @@
 package com.nvidia.spark.rapids
 
 import ai.rapids.cudf._
-import ai.rapids.cudf.ParquetColumnWriterOptions._
 import com.nvidia.spark.RebaseHelper
 import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext}
 import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
@@ -33,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
 import org.apache.spark.sql.rapids.ColumnarWriteTaskStatsTracker
 import org.apache.spark.sql.rapids.execution.TrampolineUtil
-import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DateType, Decimal, DecimalType, MapType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 object GpuParquetFileFormat {
@@ -117,68 +116,6 @@ object GpuParquetFileFormat {
     }
   }
 
-  def parquetWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
-      builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
-      dataType: DataType,
-      name: String,
-      writeInt96: Boolean,
-      nullable: Boolean): T = {
-    dataType match {
-      case dt: DecimalType =>
-        builder.withDecimalColumn(name, dt.precision, nullable)
-      case TimestampType =>
-        builder.withTimestampColumn(name, writeInt96, nullable)
-      case s: StructType =>
-        builder.withStructColumn(
-          parquetWriterOptionsFromSchema(
-            // we are setting this to nullable, in case the parent is a Map's key and wants to
-            // set this to false
-            structBuilder(name, nullable),
-            s,
-            writeInt96).build())
-      case a: ArrayType =>
-        builder.withListColumn(
-          parquetWriterOptionsFromField(
-            // we are setting this to nullable, in case the parent is a Map's key and wants to
-            // set this to false
-            listBuilder(name, nullable),
-            a.elementType,
-            name,
-            writeInt96,
-            true).build())
-      case m: MapType =>
-        builder.withMapColumn(
-          mapColumn(name,
-            parquetWriterOptionsFromField(
-              ParquetWriterOptions.builder(),
-              m.keyType,
-              "key",
-              writeInt96,
-              false).build().getChildColumnOptions()(0),
-            parquetWriterOptionsFromField(
-              ParquetWriterOptions.builder(),
-              m.valueType,
-              "value",
-              writeInt96,
-              nullable).build().getChildColumnOptions()(0)))
-      case _ =>
-        builder.withColumns(nullable, name)
-    }
-    builder.asInstanceOf[T]
-  }
-
-  def parquetWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
-      builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
-      schema: StructType,
-      writeInt96: Boolean): T = {
-    // TODO once https://github.com/rapidsai/cudf/issues/7654 is fixed go back to actually
-    // setting if the output is nullable or not everywhere we have hard-coded nullable=true
-    schema.foreach(field =>
-      parquetWriterOptionsFromField(builder, field.dataType, field.name, writeInt96, true)
-    )
-    builder.asInstanceOf[T]
-  }
-
   def parseCompressionType(compressionType: String): Option[CompressionType] = {
     compressionType match {
       case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE)
@@ -401,8 +338,8 @@ class GpuParquetWriter(
 
   override val tableWriter: TableWriter = {
     val writeContext = new ParquetWriteSupport().init(conf)
-    val builder = GpuParquetFileFormat
-      .parquetWriterOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
+    val builder = SchemaUtils
+      .writerOptionsFromSchema(ParquetWriterOptions.builder(), dataSchema,
         ParquetOutputTimestampType.INT96 == SQLConf.get.parquetOutputTimestampType)
       .withMetadata(writeContext.getExtraMetaData)
       .withCompressionType(compressionType)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala
index 1531ff440b7..e535d1d9f9a 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala
@@ -21,7 +21,8 @@ import java.util.Optional
 import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 
-import ai.rapids.cudf.{ColumnView, Table}
+import ai.rapids.cudf._
+import ai.rapids.cudf.ColumnWriterOptions._
 import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
 import org.apache.orc.TypeDescription
 
@@ -208,4 +209,70 @@ object SchemaUtils extends Arm {
       case _ => col
     }
   }
+
+  private def writerOptionsFromField[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
+      builder: NestedBuilder[T, V],
+      dataType: DataType,
+      name: String,
+      nullable: Boolean,
+      writeInt96: Boolean): T = {
+    dataType match {
+      case dt: DecimalType =>
+        builder.withDecimalColumn(name, dt.precision, nullable)
+      case TimestampType =>
+        builder.withTimestampColumn(name, writeInt96, nullable)
+      case s: StructType =>
+        builder.withStructColumn(
+          writerOptionsFromSchema(
+            structBuilder(name, nullable),
+            s,
+            writeInt96).build())
+      case a: ArrayType =>
+        builder.withListColumn(
+          writerOptionsFromField(
+            listBuilder(name, nullable),
+            a.elementType,
+            name,
+            a.containsNull,
+            writeInt96).build())
+      case m: MapType =>
+        // It is ok to use `StructBuilder` here for key and value, since either
+        // `OrcWriterOptions.Builder` or `ParquetWriterOptions.Builder` is actually an
+        // `AbstractStructBuilder`, and here only handles the common column metadata things.
+        builder.withMapColumn(
+          mapColumn(name,
+            writerOptionsFromField(
+              structBuilder(name, nullable),
+              m.keyType,
+              "key",
+              nullable = false,
+              writeInt96).build().getChildColumnOptions()(0),
+            writerOptionsFromField(
+              structBuilder(name, nullable),
+              m.valueType,
+              "value",
+              m.valueContainsNull,
+              writeInt96).build().getChildColumnOptions()(0)))
+      case _ =>
+        builder.withColumns(nullable, name)
+    }
+    builder.asInstanceOf[T]
+  }
+
+  /**
+   * Build writer options from schema for both ORC and Parquet writers.
+   *
+   * (There is an open issue "https://github.com/rapidsai/cudf/issues/7654" for Parquet writer,
+   * but it is circumvented by https://github.com/rapidsai/cudf/pull/9061, so the nullable can
+   * go back to the actual setting, instead of the hard-coded nullable=true before.)
+   */
+  def writerOptionsFromSchema[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
+      builder: NestedBuilder[T, V],
+      schema: StructType,
+      writeInt96: Boolean = false): T = {
+    schema.foreach(field =>
+      writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96)
+    )
+    builder.asInstanceOf[T]
+  }
 }
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala
index c4f0f95b43b..3baa6ca2386 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcUtils}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 object GpuOrcFileFormat extends Logging {
   // The classname used when Spark is configured to use the Hive implementation for ORC.
@@ -161,18 +161,9 @@ class GpuOrcWriter(path: String,
   extends ColumnarOutputWriter(path, context, dataSchema, "ORC") {
 
   override val tableWriter: TableWriter = {
-    val builder= ORCWriterOptions.builder()
+    val builder = SchemaUtils
+      .writerOptionsFromSchema(ORCWriterOptions.builder(), dataSchema)
       .withCompressionType(CompressionType.valueOf(OrcConf.COMPRESS.getString(conf)))
-
-    dataSchema.foreach(entry => {
-      if (entry.nullable) {
-        builder.withColumnNames(entry.name)
-      } else {
-        builder.withNotNullableColumnNames(entry.name)
-      }
-    })
-
-    val options = builder.build()
-    Table.writeORCChunked(options, this)
+    Table.writeORCChunked(builder.build(), this)
   }
 }
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala
index a1a51944f1a..beec3b4c6d4 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/collectionOperations.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids
 import scala.collection.mutable.ArrayBuffer
 
 import ai.rapids.cudf
-import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetColumnWriterOptions, ParquetWriterOptions, Scalar}
+import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetWriterOptions, Scalar}
 import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuListUtils, GpuLiteral, GpuMapUtils, GpuScalar, GpuUnaryExpression}
 import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn
 import com.nvidia.spark.rapids.RapidsPluginImplicits._