Skip to content

Commit

Permalink
Merge pull request #1121 from jetstreamapp/bug/fix-ip-address-record-…
Browse files Browse the repository at this point in the history
…overwrite

Fix ip address table overwrite
  • Loading branch information
paustint authored Dec 21, 2024
2 parents 29344a3 + 571b0a7 commit d159d9e
Showing 1 changed file with 61 additions and 14 deletions.
75 changes: 61 additions & 14 deletions apps/cron-tasks/src/geo-ip-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,36 @@ const CITY_URL = 'https://download.maxmind.com/geoip/databases/GeoLite2-City-CSV
const CITY_FILENAME = 'GeoLite2-City.zip';
const CITY_FILENAMES = ['GeoLite2-City-Locations-en.csv', 'GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Blocks-IPv6.csv'];

async function importCSVToTable(csvPath: string, tableName: string, schema: string): Promise<void> {
async function importCSVToTable(csvPath: string, tableName: string, schema: string, createTemp: boolean): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Create temporary table with same structure
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`);
// Only create temp table if requested (first file)
if (createTemp) {
logger.info(`Creating temporary table ${fullTempTableName}`);
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`);
await execAsync(
`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`
);
}

// Import CSV data
logger.info(`Importing ${csvPath} to ${fullTempTableName}`);
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "\\COPY ${fullTempTableName} FROM '${csvPath}' WITH (FORMAT CSV, HEADER)"`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
throw error;
}
}

async function swapTables(tableName: string, schema: string): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Atomic swap
await execAsync(`
psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "
Expand All @@ -89,13 +107,11 @@ async function importCSVToTable(csvPath: string, tableName: string, schema: stri
COMMIT;
"
`);

logger.info(`Successfully imported ${csvPath} to ${fullTableName}`);
logger.info(`Successfully swapped ${fullTempTableName} to ${fullTableName}`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
logger.error(getExceptionLog(error), `Error swapping tables: %s`, error.message);
// Cleanup temp table if it exists
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`).catch(() => {
// Ignore errors
logger.warn(`Failed to drop table ${fullTempTableName}`);
});
throw error;
Expand Down Expand Up @@ -184,35 +200,66 @@ async function streamToBuffer(stream: ReadableStream): Promise<Buffer> {
return Buffer.concat(chunks);
}

async function processNetwork(csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip');
const tempTablesNeedToBeCreated = {
network: true,
location: true,
organization: true,
};

async function processNetwork(filename: string, csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip', tempTablesNeedToBeCreated.network);
tempTablesNeedToBeCreated.network = false;
}

async function processLocation(csvPath: string) {
await importCSVToTable(csvPath, 'location', 'geo_ip');
await importCSVToTable(csvPath, 'location', 'geo_ip', tempTablesNeedToBeCreated.location);
tempTablesNeedToBeCreated.location = false;
}

async function processASN(filename: string, csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip', tempTablesNeedToBeCreated.organization);
tempTablesNeedToBeCreated.organization = false;
}

async function processASN(csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip');
async function cleanupTempTables(): Promise<void> {
const tables = ['network', 'location', 'organization'];

for (const table of tables) {
const tempTableName = `geo_ip.${table}_temp`;
try {
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${tempTableName}"`);
logger.info(`Cleaned up temporary table ${tempTableName}`);
} catch (error) {
logger.warn(`Failed to drop temporary table ${tempTableName}: ${error.message}`);
}
}
}

async function main() {
try {
logger.info('Starting GeoIP database update...');

// Clean up any leftover temp tables first
await cleanupTempTables();

// Process ASN data
await processFile(ASN_URL, ASN_FILENAME, ASN_FILENAMES, async (filename, csvPath) => {
await processASN(csvPath);
await processASN(filename, csvPath);
});
// Swap ASN tables after all files are processed
await swapTables('organization', 'geo_ip');

// Process City data
await processFile(CITY_URL, CITY_FILENAME, CITY_FILENAMES, async (filename, csvPath) => {
if (filename.includes('Blocks')) {
await processNetwork(csvPath);
await processNetwork(filename, csvPath);
} else if (filename.includes('Locations')) {
await processLocation(csvPath);
}
});
// Swap network/location tables after all files are processed
await swapTables('location', 'geo_ip');
await swapTables('network', 'geo_ip');

logger.info('GeoIP database update completed successfully');
} catch (error) {
Expand Down

0 comments on commit d159d9e

Please sign in to comment.