Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ip address table overwrite #1121

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 61 additions & 14 deletions apps/cron-tasks/src/geo-ip-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,36 @@ const CITY_URL = 'https://download.maxmind.com/geoip/databases/GeoLite2-City-CSV
const CITY_FILENAME = 'GeoLite2-City.zip';
const CITY_FILENAMES = ['GeoLite2-City-Locations-en.csv', 'GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Blocks-IPv6.csv'];

async function importCSVToTable(csvPath: string, tableName: string, schema: string): Promise<void> {
async function importCSVToTable(csvPath: string, tableName: string, schema: string, createTemp: boolean): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Create temporary table with same structure
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`);
// Only create temp table if requested (first file)
if (createTemp) {
logger.info(`Creating temporary table ${fullTempTableName}`);
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`);
await execAsync(
`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`
);
}

// Import CSV data
logger.info(`Importing ${csvPath} to ${fullTempTableName}`);
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "\\COPY ${fullTempTableName} FROM '${csvPath}' WITH (FORMAT CSV, HEADER)"`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
throw error;
}
}

async function swapTables(tableName: string, schema: string): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Atomic swap
await execAsync(`
psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "
Expand All @@ -89,13 +107,11 @@ async function importCSVToTable(csvPath: string, tableName: string, schema: stri
COMMIT;
"
`);

logger.info(`Successfully imported ${csvPath} to ${fullTableName}`);
logger.info(`Successfully swapped ${fullTempTableName} to ${fullTableName}`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
logger.error(getExceptionLog(error), `Error swapping tables: %s`, error.message);
// Cleanup temp table if it exists
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`).catch(() => {
// Ignore errors
logger.warn(`Failed to drop table ${fullTempTableName}`);
});
throw error;
Expand Down Expand Up @@ -184,35 +200,66 @@ async function streamToBuffer(stream: ReadableStream): Promise<Buffer> {
return Buffer.concat(chunks);
}

async function processNetwork(csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip');
const tempTablesNeedToBeCreated = {
network: true,
location: true,
organization: true,
};

async function processNetwork(filename: string, csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip', tempTablesNeedToBeCreated.network);
tempTablesNeedToBeCreated.network = false;
}

async function processLocation(csvPath: string) {
await importCSVToTable(csvPath, 'location', 'geo_ip');
await importCSVToTable(csvPath, 'location', 'geo_ip', tempTablesNeedToBeCreated.location);
tempTablesNeedToBeCreated.location = false;
}

async function processASN(filename: string, csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip', tempTablesNeedToBeCreated.organization);
tempTablesNeedToBeCreated.organization = false;
}

async function processASN(csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip');
async function cleanupTempTables(): Promise<void> {
const tables = ['network', 'location', 'organization'];

for (const table of tables) {
const tempTableName = `geo_ip.${table}_temp`;
try {
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${tempTableName}"`);
logger.info(`Cleaned up temporary table ${tempTableName}`);
} catch (error) {
logger.warn(`Failed to drop temporary table ${tempTableName}: ${error.message}`);
}
}
}

async function main() {
try {
logger.info('Starting GeoIP database update...');

// Clean up any leftover temp tables first
await cleanupTempTables();

// Process ASN data
await processFile(ASN_URL, ASN_FILENAME, ASN_FILENAMES, async (filename, csvPath) => {
await processASN(csvPath);
await processASN(filename, csvPath);
});
// Swap ASN tables after all files are processed
await swapTables('organization', 'geo_ip');

// Process City data
await processFile(CITY_URL, CITY_FILENAME, CITY_FILENAMES, async (filename, csvPath) => {
if (filename.includes('Blocks')) {
await processNetwork(csvPath);
await processNetwork(filename, csvPath);
} else if (filename.includes('Locations')) {
await processLocation(csvPath);
}
});
// Swap network/location tables after all files are processed
await swapTables('location', 'geo_ip');
await swapTables('network', 'geo_ip');

logger.info('GeoIP database update completed successfully');
} catch (error) {
Expand Down
Loading