Skip to content

Commit

Permalink
Fix ip address table overwrite
Browse files Browse the repository at this point in the history
46 and 46 are stored in different files and we were creating a new temp table for each file, thus not retaining all the results
  • Loading branch information
paustint committed Dec 21, 2024
1 parent 2682489 commit 571b0a7
Showing 1 changed file with 61 additions and 14 deletions.
75 changes: 61 additions & 14 deletions apps/cron-tasks/src/geo-ip-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,36 @@ const CITY_URL = 'https://download.maxmind.com/geoip/databases/GeoLite2-City-CSV
const CITY_FILENAME = 'GeoLite2-City.zip';
const CITY_FILENAMES = ['GeoLite2-City-Locations-en.csv', 'GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Blocks-IPv6.csv'];

async function importCSVToTable(csvPath: string, tableName: string, schema: string): Promise<void> {
async function importCSVToTable(csvPath: string, tableName: string, schema: string, createTemp: boolean): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Create temporary table with same structure
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`);
// Only create temp table if requested (first file)
if (createTemp) {
logger.info(`Creating temporary table ${fullTempTableName}`);
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`);
await execAsync(
`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`
);
}

// Import CSV data
logger.info(`Importing ${csvPath} to ${fullTempTableName}`);
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "\\COPY ${fullTempTableName} FROM '${csvPath}' WITH (FORMAT CSV, HEADER)"`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
throw error;
}
}

async function swapTables(tableName: string, schema: string): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Atomic swap
await execAsync(`
psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "
Expand All @@ -89,13 +107,11 @@ async function importCSVToTable(csvPath: string, tableName: string, schema: stri
COMMIT;
"
`);

logger.info(`Successfully imported ${csvPath} to ${fullTableName}`);
logger.info(`Successfully swapped ${fullTempTableName} to ${fullTableName}`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
logger.error(getExceptionLog(error), `Error swapping tables: %s`, error.message);
// Cleanup temp table if it exists
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`).catch(() => {
// Ignore errors
logger.warn(`Failed to drop table ${fullTempTableName}`);
});
throw error;
Expand Down Expand Up @@ -184,35 +200,66 @@ async function streamToBuffer(stream: ReadableStream): Promise<Buffer> {
return Buffer.concat(chunks);
}

async function processNetwork(csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip');
const tempTablesNeedToBeCreated = {
network: true,
location: true,
organization: true,
};

async function processNetwork(filename: string, csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip', tempTablesNeedToBeCreated.network);
tempTablesNeedToBeCreated.network = false;
}

async function processLocation(csvPath: string) {
await importCSVToTable(csvPath, 'location', 'geo_ip');
await importCSVToTable(csvPath, 'location', 'geo_ip', tempTablesNeedToBeCreated.location);
tempTablesNeedToBeCreated.location = false;
}

async function processASN(filename: string, csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip', tempTablesNeedToBeCreated.organization);
tempTablesNeedToBeCreated.organization = false;
}

async function processASN(csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip');
async function cleanupTempTables(): Promise<void> {
const tables = ['network', 'location', 'organization'];

for (const table of tables) {
const tempTableName = `geo_ip.${table}_temp`;
try {
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${tempTableName}"`);
logger.info(`Cleaned up temporary table ${tempTableName}`);
} catch (error) {
logger.warn(`Failed to drop temporary table ${tempTableName}: ${error.message}`);
}
}
}

async function main() {
try {
logger.info('Starting GeoIP database update...');

// Clean up any leftover temp tables first
await cleanupTempTables();

// Process ASN data
await processFile(ASN_URL, ASN_FILENAME, ASN_FILENAMES, async (filename, csvPath) => {
await processASN(csvPath);
await processASN(filename, csvPath);
});
// Swap ASN tables after all files are processed
await swapTables('organization', 'geo_ip');

// Process City data
await processFile(CITY_URL, CITY_FILENAME, CITY_FILENAMES, async (filename, csvPath) => {
if (filename.includes('Blocks')) {
await processNetwork(csvPath);
await processNetwork(filename, csvPath);
} else if (filename.includes('Locations')) {
await processLocation(csvPath);
}
});
// Swap network/location tables after all files are processed
await swapTables('location', 'geo_ip');
await swapTables('network', 'geo_ip');

logger.info('GeoIP database update completed successfully');
} catch (error) {
Expand Down

0 comments on commit 571b0a7

Please sign in to comment.