Skip to content

Commit

Permalink
Define basics for collecting Iceberg metadata for the current snapshot (
Browse files Browse the repository at this point in the history
#3559)

* [GOBBLIN-1701] Replace jcenter with either maven central or gradle plugin portal (#3554)

* remove jcentral
* Use gradle plugin portal for shadow
* Use maven central in all other cases

* [GOBBLIN-1695] Fix: Failure to add spec executors doesn't block deployment (#3551)

* Allow first time failure to authenticate with Azkaban to fail silently

* Fix findbugs report

* Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow

* Add handling for fetchSession throwing an exception

* Add logging when fails on constructor and initialization, but continue to local deploy

* Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor

* Fixed vars

* Revert changes on azkabanSpecProducer

* clean up error throwing

* revert function checking changes

* Reformat file

* Clean up function

* Format file for try/catch

* Allow first time failure to authenticate with Azkaban to fail silently

* Fix findbugs report

* Refactor azkaban authentication into function. Call on init and if session_id is null when adding a flow

* Fixed rebase

* Fixed rebase

* Revert changes for azkabanSpecProducer, but quiet log instead of throw in constructor

* Add whitespace back

* fix helix job wait completion bug when job goes to STOPPING state (#3556)

address comments

update stoppingStateEndTime with currentTime

update test cases

* [GOBBLIN-1699] Log progress of reducer task for visibility with slow compaction jobs #3552

* before starting reduce
* after first record is reduced
* after reducing every 1000 records

Co-authored-by: Urmi Mustafi <[email protected]>

* Define basics for collecting Iceberg metadata for the current snapshot

* [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)

* [GOBBLIN-1673] Schema for dynamic work unit message

* [GOBBLIN-1683] Dynamic Work Unit messaging abstractions

* Address review comments

* Correct import order

Co-authored-by: Matthew Ho <[email protected]>
Co-authored-by: Andy Jiang <[email protected]>
Co-authored-by: Hanghang Nate Liu <[email protected]>
Co-authored-by: umustafi <[email protected]>
Co-authored-by: Urmi Mustafi <[email protected]>
Co-authored-by: William Lo <[email protected]>
  • Loading branch information
7 people authored Sep 13, 2022
1 parent ac4a810 commit da914e2
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;


/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hive.HiveCatalogs;


/**
* Provides an {@link IcebergCatalog}.
*/
public class IcebergCatalogFactory {
public static IcebergCatalog create(Configuration configuration) {
return new IcebergHiveCatalog(HiveCatalogs.loadCatalog(configuration));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;


/**
* Hive-Metastore-based {@link IcebergCatalog}.
*/
@Slf4j
@AllArgsConstructor
public class IcebergHiveCatalog implements IcebergCatalog {
// NOTE: specifically necessitates `HiveCatalog`, as `BaseMetastoreCatalog.newTableOps` is `protected`!
private final HiveCatalog hc;

@Override
public IcebergTable openTable(String dbName, String tableName) {
return new IcebergTable(hc.newTableOps(TableIdentifier.of(dbName, tableName)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;

import lombok.Data;

import com.google.common.collect.Lists;


/**
* Information about the metadata file and data file paths of a single Iceberg Snapshot.
*/
@Data
public class IcebergSnapshotInfo {

@Data
public static class ManifestFileInfo {
private final String manifestFilePath;
private final List<String> listedFilePaths;
}

private final Long snapshotId;
private final Instant timestamp;
private final String metadataPath;
private final String manifestListPath;
private final List<ManifestFileInfo> manifestFiles;

public List<String> getManifestFilePaths() {
return manifestFiles.stream().map(ManifestFileInfo::getManifestFilePath).collect(Collectors.toList());
}

public List<String> getAllDataFilePaths() {
return manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
}

public List<String> getAllPaths() {
List<String> result = Lists.newArrayList(metadataPath, manifestListPath);
result.addAll(getManifestFilePaths());
result.addAll(getAllDataFilePaths());
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.time.Instant;
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;

import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;


/**
* Exposes metadata information for a single Iceberg table.
*/
@Slf4j
@AllArgsConstructor
public class IcebergTable {
private final TableOperations tableOps;

public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
TableMetadata current = tableOps.current();
Snapshot snapshot = current.currentSnapshot();
List<ManifestFile> manifests = snapshot.allManifests();
return new IcebergSnapshotInfo(
snapshot.snapshotId(),
Instant.ofEpochMilli(snapshot.timestampMillis()),
current.metadataFileLocation(),
snapshot.manifestListLocation(),
// NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
calcAllManifestFileInfo(manifests, tableOps.io())
);
}

@VisibleForTesting
static List<ManifestFileInfo> calcAllManifestFileInfo(List<ManifestFile> manifests, FileIO io) throws IOException {
List<ManifestFileInfo> result = Lists.newArrayList();
for (ManifestFile manifest : manifests) {
result.add(calcManifestFileInfo(manifest, io));
}
return result;
}

@VisibleForTesting
static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
return new ManifestFileInfo(manifest.path(), discoverDataFilePaths(manifest, io));
}

@VisibleForTesting
static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
CloseableIterable<String> manifestPathsIterable = ManifestFiles.readPaths(manifest, io);
try {
return Lists.newArrayList(manifestPathsIterable);
} finally {
manifestPathsIterable.close();
}
}
}

0 comments on commit da914e2

Please sign in to comment.