diff --git a/CHANGES.md b/CHANGES.md
index 17ac8a1d7010..523ef3455aec 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,7 @@
 ## I/Os
 
 * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
 
 ## New Features / Improvements
 
@@ -107,6 +108,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
 * state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)).
 * Beam YAML stable release. Beam pipelines can now be written using YAML and leverage the Beam YAML framework which includes a preliminary set of IO's and turnkey transforms. More information can be found in the YAML root folder and in the [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md).
 
+
 ## Breaking Changes
 
 * `org.apache.beam.sdk.io.CountingSource.CounterMark` uses custom `CounterMarkCoder` as a default coder since all Avro-dependent
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 33beff23b311..2c7a4fc5d4f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -191,6 +191,7 @@ public static Read read() {
     return new AutoValue_TextIO_Read.Builder()
         .setCompression(Compression.AUTO)
         .setHintMatchesManyFiles(false)
+        .setSkipHeaderLines(0)
         .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .build();
   }
@@ -214,6 +215,7 @@ public static Read read() {
   public static ReadAll readAll() {
     return new AutoValue_TextIO_ReadAll.Builder()
         .setCompression(Compression.AUTO)
+        .setSkipHeaderLines(0)
         .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .build();
   }
@@ -228,6 +230,7 @@ public static ReadFiles readFiles() {
         // but is not so large as to exhaust a typical runner's maximum amount of output per
         // ProcessElement call.
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setSkipHeaderLines(0)
         .build();
   }
 
@@ -286,6 +289,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<String>
     @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
     abstract byte @Nullable [] getDelimiter();
 
+    abstract int getSkipHeaderLines();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -300,6 +305,8 @@ abstract static class Builder {
 
       abstract Builder setDelimiter(byte @Nullable [] delimiter);
 
+      abstract Builder setSkipHeaderLines(int skipHeaderLines);
+
       abstract Read build();
     }
 
@@ -396,6 +403,10 @@ public Read withDelimiter(byte[] delimiter) {
       return toBuilder().setDelimiter(delimiter).build();
     }
 
+    public Read withSkipHeaderLines(int skipHeaderLines) {
+      return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
+    }
+
     static boolean isSelfOverlapping(byte[] s) {
       // s self-overlaps if v exists such as s = vu = wv with u and w non empty
       for (int i = 1; i < s.length - 1; ++i) {
@@ -422,7 +433,9 @@ public PCollection<String> expand(PBegin input) {
               FileIO.readMatches()
                   .withCompression(getCompression())
                   .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+          .apply(
+              "Via ReadFiles",
+              readFiles().withDelimiter(getDelimiter()).withSkipHeaderLines(getSkipHeaderLines()));
     }
 
     // Helper to create a source specific to the requested compression type.
@@ -431,7 +444,8 @@ protected FileBasedSource<String> getSource() {
               new TextSource(
                   getFilepattern(),
                   getMatchConfiguration().getEmptyMatchTreatment(),
-                  getDelimiter()))
+                  getDelimiter(),
+                  getSkipHeaderLines()))
           .withCompression(getCompression());
     }
 
@@ -468,6 +482,8 @@ public abstract static class ReadAll
     @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
     abstract byte @Nullable [] getDelimiter();
 
+    abstract int getSkipHeaderLines();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -478,6 +494,8 @@ abstract static class Builder {
 
       abstract Builder setDelimiter(byte @Nullable [] delimiter);
 
+      abstract Builder setSkipHeaderLines(int skipHeaderLines);
+
       abstract ReadAll build();
     }
 
@@ -560,6 +578,8 @@ public abstract static class ReadFiles
     @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
     abstract byte @Nullable [] getDelimiter();
 
+    abstract int getSkipHeaderLines();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -568,6 +588,8 @@ abstract static class Builder {
 
       abstract Builder setDelimiter(byte @Nullable [] delimiter);
 
+      abstract Builder setSkipHeaderLines(int skipHeaderLines);
+
       abstract ReadFiles build();
     }
 
@@ -581,13 +603,17 @@ public ReadFiles withDelimiter(byte[] delimiter) {
       return toBuilder().setDelimiter(delimiter).build();
     }
 
+    public ReadFiles withSkipHeaderLines(int skipHeaderLines) {
+      return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
+    }
+
     @Override
     public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) {
       return input.apply(
           "Read all via FileBasedSource",
           new ReadAllViaFileBasedSource<>(
               getDesiredBundleSizeBytes(),
-              new CreateTextSourceFn(getDelimiter()),
+              new CreateTextSourceFn(getDelimiter(), getSkipHeaderLines()),
               StringUtf8Coder.of()));
     }
 
@@ -602,15 +628,20 @@ public void populateDisplayData(DisplayData.Builder builder) {
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
       private byte[] delimiter;
+      private int skipHeaderLines;
 
-      private CreateTextSourceFn(byte[] delimiter) {
+      private CreateTextSourceFn(byte[] delimiter, int skipHeaderLines) {
         this.delimiter = delimiter;
+        this.skipHeaderLines = skipHeaderLines;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
         return new TextSource(
-            StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter);
+            StaticValueProvider.of(input),
+            EmptyMatchTreatment.DISALLOW,
+            delimiter,
+            skipHeaderLines);
       }
     }
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
index 32b7fb12f414..8542ce011098 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java
@@ -46,6 +46,8 @@ public abstract class TextRowCountEstimator {
   @SuppressWarnings("mutable")
   public abstract byte @Nullable [] getDelimiters();
 
+  public abstract int getSkipHeaderLines();
+
   public abstract String getFilePattern();
 
   public abstract Compression getCompression();
@@ -62,7 +64,8 @@ public static TextRowCountEstimator.Builder builder() {
         .setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE)
         .setCompression(DEFAULT_COMPRESSION)
         .setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT)
-        .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT);
+        .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT)
+        .setSkipHeaderLines(0);
   }
 
   /**
@@ -114,7 +117,8 @@ public Double estimateRowCount(PipelineOptions pipelineOptions)
           new TextSource(
               ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()),
               getEmptyMatchTreatment(),
-              getDelimiters());
+              getDelimiters(),
+              getSkipHeaderLines());
       FileBasedSource<String> source =
           CompressedSource.from(textSource).withCompression(file.getCompression());
       try (BoundedSource.BoundedReader<String> reader =
@@ -160,6 +164,8 @@ public abstract Builder setDirectoryTreatment(
 
     public abstract Builder setDelimiters(byte @Nullable [] delimiters);
 
+    public abstract Builder setSkipHeaderLines(int skipHeaderLines);
+
     public abstract Builder setFilePattern(String filePattern);
 
     public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index bef30dffa8ac..3d62c677950a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -56,26 +56,43 @@
 public class TextSource extends FileBasedSource<String> {
   byte[] delimiter;
 
+  int skipHeaderLines;
+
   public TextSource(
-      ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      int skipHeaderLines) {
     super(fileSpec, emptyMatchTreatment, 1L);
     this.delimiter = delimiter;
+    this.skipHeaderLines = skipHeaderLines;
   }
 
-  public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
+  public TextSource(
+      ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
+    this(fileSpec, emptyMatchTreatment, delimiter, 0);
+  }
+
+  public TextSource(
+      MatchResult.Metadata metadata, long start, long end, byte[] delimiter, int skipHeaderLines) {
     super(metadata, 1L, start, end);
     this.delimiter = delimiter;
+    this.skipHeaderLines = skipHeaderLines;
+  }
+
+  public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
+    this(metadata, start, end, delimiter, 0);
   }
 
   @Override
   protected FileBasedSource<String> createForSubrangeOfFile(
       MatchResult.Metadata metadata, long start, long end) {
-    return new TextSource(metadata, start, end, delimiter);
+    return new TextSource(metadata, start, end, delimiter, skipHeaderLines);
   }
 
   @Override
   protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
-    return new TextBasedReader(this, delimiter);
+    return new TextBasedReader(this, delimiter, skipHeaderLines);
   }
 
   @Override
@@ -98,6 +115,7 @@ static class TextBasedReader extends FileBasedReader<String> {
     private static final byte LF = '\n';
 
     private final byte @Nullable [] delimiter;
+    private final int skipHeaderLines;
     private final ByteArrayOutputStream str;
     private final byte[] buffer;
     private final ByteBuffer byteBuffer;
@@ -112,11 +130,16 @@ static class TextBasedReader extends FileBasedReader<String> {
     private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer
 
     private TextBasedReader(TextSource source, byte[] delimiter) {
+      this(source, delimiter, 0);
+    }
+
+    private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines) {
       super(source);
       this.buffer = new byte[READ_BUFFER_SIZE];
       this.str = new ByteArrayOutputStream();
       this.byteBuffer = ByteBuffer.wrap(buffer);
       this.delimiter = delimiter;
+      this.skipHeaderLines = skipHeaderLines;
     }
 
     @Override
@@ -171,21 +194,42 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
           } else {
             startOfNextRecord = bufferPosn = (int) requiredPosition;
           }
+          skipHeader(skipHeaderLines, true);
         } else {
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          startOfNextRecord = requiredPosition;
+          skipHeader(skipHeaderLines, false);
+          if (requiredPosition > startOfNextRecord) {
+            ((SeekableByteChannel) channel).position(requiredPosition);
+            startOfNextRecord = requiredPosition;
+            bufferLength = bufferPosn = 0;
+          }
+          // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
+          // to the beginning of the next record.
+          readNextRecord();
+          currentValue = null;
         }
 
-        // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
-        // to the beginning of the next record.
-        readNextRecord();
-        currentValue = null;
       } else {
         // Check to see if we start with the UTF_BOM bytes skipping them if present.
         if (fileStartsWithBom()) {
           startOfNextRecord = bufferPosn = UTF8_BOM.size();
         }
+        skipHeader(skipHeaderLines, false);
+      }
+    }
+
+    private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException {
+      if (headerLines == 1) {
+        readNextRecord();
+      } else if (headerLines > 1) {
+        // this will be expensive
+        ((SeekableByteChannel) inChannel).position(0);
+        for (int line = 0; line < headerLines; ++line) {
+          readNextRecord();
+        }
+      } else if (headerLines == 0 && skipFirstLine) {
+        readNextRecord();
       }
+      currentValue = null;
     }
 
     private boolean fileStartsWithBom() throws IOException {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 379345b1001e..84c05ee6c906 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -245,15 +245,24 @@ private static File createZipFile(
   }
 
   private static TextSource prepareSource(
-      TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter) throws IOException {
+      TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter, int skipHeaderLines)
+      throws IOException {
     Path path = temporaryFolder.newFile().toPath();
     Files.write(path, data);
-    return getTextSource(path.toString(), delimiter);
+    return getTextSource(path.toString(), delimiter, skipHeaderLines);
   }
 
-  public static TextSource getTextSource(String path, @Nullable byte[] delimiter) {
+  public static TextSource getTextSource(
+      String path, @Nullable byte[] delimiter, int skipHeaderLines) {
     return new TextSource(
-        ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter);
+        ValueProvider.StaticValueProvider.of(path),
+        EmptyMatchTreatment.DISALLOW,
+        delimiter,
+        skipHeaderLines);
+  }
+
+  public static TextSource getTextSource(String path, @Nullable byte[] delimiter) {
+    return getTextSource(path, delimiter, 0);
   }
 
   private static String getFileSuffix(Compression compression) {
@@ -384,7 +393,7 @@ public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel
       Files.write(path, line.getBytes(UTF_8));
       Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
       FileBasedSource source =
-          getTextSource(path.toString(), null)
+          getTextSource(path.toString(), null, 0)
               .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
       FileBasedReader<String> reader =
           source.createSingleFileReader(PipelineOptionsFactory.create());
@@ -433,7 +442,49 @@ public void testSplittingSource() throws Exception {
     }
 
     private TextSource prepareSource(byte[] data) throws IOException {
-      return TextIOReadTest.prepareSource(tempFolder, data, null);
+      return TextIOReadTest.prepareSource(tempFolder, data, null, 0);
+    }
+
+    private void runTestReadWithData(byte[] data, List<String> expectedResults) throws Exception {
+      TextSource source = prepareSource(data);
+      List<String> actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create());
+      assertThat(
+          actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0])));
+    }
+  }
+
+  /** Tests for reading files with/without header. */
+  @RunWith(Parameterized.class)
+  public static class SkippingHeaderTest {
+    private static final ImmutableList<String> EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz");
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Iterable<Object[]> data() {
+      return ImmutableList.<Object[]>builder()
+          .add(new Object[] {"\n\n\n", ImmutableList.of("", ""), 1})
+          .add(new Object[] {"\n", ImmutableList.of(), 1})
+          .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED, 1})
+          .add(new Object[] {"header1\nheader2\nasdf\nhjkl\nxyz\n", EXPECTED, 2})
+          .build();
+    }
+
+    @Parameterized.Parameter(0)
+    public String line;
+
+    @Parameterized.Parameter(1)
+    public ImmutableList<String> expected;
+
+    @Parameterized.Parameter(2)
+    public int skipHeaderLines;
+
+    @Test
+    public void testReadLines() throws Exception {
+      runTestReadWithData(line.getBytes(UTF_8), expected);
+    }
+
+    private TextSource prepareSource(byte[] data) throws IOException {
+      return TextIOReadTest.prepareSource(tempFolder, data, null, skipHeaderLines);
     }
 
     private void runTestReadWithData(byte[] data, List<String> expectedResults) throws Exception {
@@ -477,7 +528,8 @@ public static Iterable<Object[]> data() {
     @Test
     public void testReadLinesWithCustomDelimiter() throws Exception {
       SourceTestUtils.assertSplitAtFractionExhaustive(
-          TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}),
+          TextIOReadTest.prepareSource(
+              tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, 0),
           PipelineOptionsFactory.create());
     }
 
@@ -489,7 +541,7 @@ public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel(
       Files.write(path, testCase.getBytes(UTF_8));
       Metadata metadata = FileSystems.matchSingleFileSpec(path.toString());
       FileBasedSource source =
-          getTextSource(path.toString(), delimiter)
+          getTextSource(path.toString(), delimiter, 0)
               .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
       FileBasedReader<String> reader =
           source.createSingleFileReader(PipelineOptionsFactory.create());
@@ -743,7 +795,7 @@ public void testTextIOGetName() {
     }
 
     private TextSource prepareSource(byte[] data) throws IOException {
-      return TextIOReadTest.prepareSource(tempFolder, data, null);
+      return TextIOReadTest.prepareSource(tempFolder, data, null, 0);
     }
 
     @Test
@@ -977,7 +1029,8 @@ public void testReadFilesWithFilename() throws IOException {
               new TextSource(
                   ValueProvider.StaticValueProvider.of(input),
                   EmptyMatchTreatment.DISALLOW,
-                  new byte[] {'\n'});
+                  new byte[] {'\n'},
+                  0);
 
       PCollection<KV<String, String>> lines =
           p.apply(
@@ -1102,7 +1155,7 @@ public void processElement(ProcessContext c) {
             ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
         // Create a TextSource, passing null as the delimiter to use the default
         // delimiters ('\n', '\r', or '\r\n').
-        TextSource textSource = new TextSource(filenameProvider, null, null);
+        TextSource textSource = new TextSource(filenameProvider, null, null, 0);
         try {
           BoundedSource.BoundedReader<String> reader =
               textSource