Skip to content

Commit

Permalink
feat(be): Added trending
Browse files Browse the repository at this point in the history
  • Loading branch information
MendyBerger committed May 20, 2024
1 parent f4af5b7 commit 4e85507
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 17 deletions.
2 changes: 1 addition & 1 deletion backend/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ csv = "1.1.6"
async-stripe = { version = "0.34.1", features = ["runtime-tokio-hyper-rustls"] }
bigdecimal = "0.4.0"
mime = "0.3.16"
once_cell = "1.14.0"

# project deps
ji_core = { path = "../ji_core", features = ["db"] }
Expand Down Expand Up @@ -110,5 +111,4 @@ opt-level = 3

[dev-dependencies]
insta = { version = "1.20.0", features = ["redactions", "json"] }
once_cell = "1.14.0"
yup-oauth2 = { version = "7.0.1", features = ["hyper-rustls"] }
26 changes: 22 additions & 4 deletions backend/api/src/algolia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,8 @@ fn media_filter(kind: MediaGroupKind, invert: bool) -> CommonFilter<FacetFilter>
#[derive(Clone)]
pub struct Client {
inner: Inner,
application_id: String,
search_key: String,
media_index: String,
jig_index: String,
resource_index: String,
Expand All @@ -1881,10 +1883,11 @@ pub struct Client {
impl Client {
pub fn new(settings: Option<AlgoliaSettings>) -> anyhow::Result<Option<Self>> {
if let Some(settings) = settings {
let app_id = algolia::AppId::new(settings.application_id);
let app_id = algolia::AppId::new(settings.application_id.clone());

let (
inner,
search_key,
media_index,
jig_index,
resource_index,
Expand All @@ -1905,7 +1908,7 @@ impl Client {
settings.course_index,
) {
(
Some(key),
Some(search_key),
Some(media_index),
Some(jig_index),
Some(resource_index),
Expand All @@ -1915,7 +1918,8 @@ impl Client {
Some(user_index),
Some(course_index),
) => (
Inner::new(app_id, ApiKey(key))?,
Inner::new(app_id.clone(), ApiKey(search_key.clone()))?,
search_key,
media_index,
jig_index,
resource_index,
Expand All @@ -1929,6 +1933,8 @@ impl Client {
};

Ok(Some(Self {
search_key,
application_id: settings.application_id,
inner,
media_index,
jig_index,
Expand Down Expand Up @@ -2048,7 +2054,7 @@ impl Client {
page_limit: u32,
blocked: Option<bool>,
is_rated: Option<bool>,
) -> anyhow::Result<Option<(Vec<Uuid>, u32, u64)>> {
) -> anyhow::Result<Option<(Vec<JigId>, u32, u64)>> {
let mut and_filters = algolia::filter::AndFilter { filters: vec![] };

if let Some(author_id) = author_id {
Expand Down Expand Up @@ -2752,4 +2758,16 @@ impl Client {

Ok(Some((results, pages, total_hits)))
}

pub fn application_id(&self) -> &str {
&self.application_id
}

pub fn jig_index(&self) -> &str {
&self.jig_index
}

pub fn search_key(&self) -> &str {
&self.search_key
}
}
4 changes: 2 additions & 2 deletions backend/api/src/db/jig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ from jig_data
#[instrument(skip(db))]
pub async fn get_by_ids(
db: &PgPool,
ids: &[Uuid],
ids: &[JigId],
draft_or_live: DraftOrLive,
user_id: Option<UserId>,
) -> sqlx::Result<Vec<JigResponse>> {
Expand Down Expand Up @@ -372,7 +372,7 @@ from jig
inner join jig_admin_data "admin" on admin.jig_id = jig.id
order by ord asc
"#,
ids,
&ids.iter().map(|i| i.0).collect::<Vec<Uuid>>(),
user_id.map(|x| x.0)
)
.fetch_all(&mut txn)
Expand Down
26 changes: 24 additions & 2 deletions backend/api/src/http/endpoints/jig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use actix_web::{
};
use futures::try_join;
use ji_core::settings::RuntimeSettings;
use shared::domain::user::UserScope;
use shared::domain::{jig::JigTrendingResponse, user::UserScope};
use shared::{
api::{endpoints::jig, ApiEndpoint, PathParts},
domain::{
Expand All @@ -24,9 +24,10 @@ use uuid::Uuid;

use crate::{
db::{self, jig::CreateJigError},
error::{self},
error,
extractor::{get_user_id, ScopeAdmin, TokenUser, TokenUserWithScope},
service::ServiceData,
trending,
};

mod codes;
Expand Down Expand Up @@ -337,6 +338,23 @@ async fn search(
}))
}

/// Trending jigs.
#[instrument(skip_all)]
async fn trending(
db: Data<PgPool>,
claims: Option<TokenUser>,
algolia: ServiceData<crate::algolia::Client>,
) -> Result<Json<<jig::Trending as ApiEndpoint>::Res>, ServiceError> {
let user_id = claims.map(|c| c.user_id());
let ids = trending::get_trending(algolia).await?;

let jigs = db::jig::get_by_ids(&db, &ids, DraftOrLive::Live, user_id)
.await
.into_anyhow()?;

Ok(Json(JigTrendingResponse { jigs }))
}

/// Update a JIG's admin data.
async fn update_admin_data(
db: Data<PgPool>,
Expand Down Expand Up @@ -549,6 +567,10 @@ pub fn configure(cfg: &mut ServiceConfig) {
<jig::Search as ApiEndpoint>::Path::PATH,
jig::Search::METHOD.route().to(search),
)
.route(
<jig::Trending as ApiEndpoint>::Path::PATH,
jig::Trending::METHOD.route().to(trending),
)
.route(
<jig::UpdateDraftData as ApiEndpoint>::Path::PATH,
jig::UpdateDraftData::METHOD.route().to(update_draft),
Expand Down
1 change: 1 addition & 0 deletions backend/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod service;
pub mod stripe;
pub mod token;
pub mod translate;
pub mod trending;

// // todo: make this configurable?
// const ARGON2_DEFAULT_PARAMS: argon2::Params = argon2::Params {
Expand Down
120 changes: 120 additions & 0 deletions backend/api/src/trending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// TODO: move some of this logic to the algolia module and/or algolia crate

use std::{pin::Pin, sync::RwLock};

use futures::{future::Shared, Future, FutureExt};
use serde::Deserialize;
use serde_json::json;
use shared::domain::jig::JigId;

use crate::service::ServiceData;

static TRENDING_CACHE: RwLock<TrendingCacheState> = RwLock::new(TrendingCacheState::Init);

const REFRESH_INTERVAL_SECONDS: i64 = 60 * 60; // 1 hour
const TRENDING_ITEMS: &str = "trending-items";
const TRENDING_JIG_COUNT: u32 = 20;

#[derive(Clone)]
enum TrendingCacheState {
Init,
Loading(Shared<Pin<Box<dyn Future<Output = Result<Vec<JigId>, ()>> + Send + Sync>>>),
Loaded(TrendingCache),
}

#[derive(Clone)]
struct TrendingCache {
expires_at: chrono::DateTime<chrono::Utc>,
jigs: Vec<JigId>,
}

#[derive(Deserialize, Debug)]
struct Response {
results: Vec<ResponseResult>,
}

#[derive(Deserialize, Debug)]
struct ResponseResult {
hits: Vec<Jig>,
}

#[derive(Deserialize, Debug)]
struct Jig {
#[serde(rename = "objectID")]
pub object_id: JigId,
}

async fn fetch_trending_algolia(
algolia: ServiceData<crate::algolia::Client>,
) -> Result<Vec<JigId>, ()> {
let client = reqwest::Client::new();
let mut res = client
.post(&format!(
"https://{}-dsn.algolia.net/1/indexes/*/recommendations",
algolia.application_id()
))
.header("x-algolia-api-key", algolia.search_key())
.header("x-algolia-application-id", algolia.application_id())
.json(&json!({
"requests": [{
"indexName": algolia.jig_index(),
"model": TRENDING_ITEMS,
"threshold": 0,
"maxRecommendations": TRENDING_JIG_COUNT,
}]
}))
.send()
.await
.map_err(|e| {
log::error!("{:?}", e);
()
})?
.json::<Response>()
.await
.map_err(|e| {
log::error!("{:?}", e);
()
})?;

let res = res
.results
.remove(0)
.hits
.into_iter()
.map(|jig| jig.object_id)
.collect::<Vec<JigId>>();
Ok(res)
}

pub async fn get_trending(
algolia: ServiceData<crate::algolia::Client>,
) -> anyhow::Result<Vec<JigId>> {
// TODO: instead of unwrapping the lock, reset the value and clear the locks poison
// check the following example https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.clear_poison
let cache = TRENDING_CACHE.read().unwrap().clone();
Ok(match cache {
TrendingCacheState::Loaded(l) if l.expires_at > chrono::Utc::now() => l.jigs.clone(),
TrendingCacheState::Loading(future) => future
.await
.map_err(|_| anyhow::Error::msg("failed to fetch trending jigs from algolia"))?,
_ => {
let future = Box::pin(fetch_trending_algolia(algolia))
as Pin<Box<dyn Future<Output = _> + Send + Sync + 'static>>;
let future = future.shared();
*TRENDING_CACHE.write().unwrap() = TrendingCacheState::Loading(future.clone());
let jigs = future
.await
.map_err(|_| anyhow::Error::msg("failed to fetch trending jigs from algolia"))?;
let expires_at = expiration_time();
*TRENDING_CACHE.write().unwrap() = TrendingCacheState::Loaded(TrendingCache {
expires_at,
jigs: jigs.clone(),
});
jigs
}
})
}

fn expiration_time() -> chrono::DateTime<chrono::Utc> {
chrono::Utc::now() + chrono::Duration::seconds(REFRESH_INTERVAL_SECONDS)
}
1 change: 1 addition & 0 deletions backend/stable_id/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 11 additions & 8 deletions shared/rust/src/api/endpoints/jig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,7 @@ use crate::{
api::Method,
domain::{
jig::{
GetJigPlaylistsPath, GetJigPlaylistsResponse, JigAdminDataUpdatePath,
JigAdminTransferRequest, JigBrowsePath, JigBrowseQuery, JigBrowseResponse,
JigClonePath, JigCountPath, JigCountResponse, JigCoverPath, JigCreatePath,
JigCreateRequest, JigDeleteAllPath, JigDeletePath, JigGetDraftPath, JigGetLivePath,
JigId, JigLikePath, JigLikedPath, JigLikedResponse, JigPlayPath, JigPublishPath,
JigResponse, JigSearchPath, JigSearchQuery, JigSearchResponse, JigTransferAdminPath,
JigUnlikePath, JigUpdateAdminDataRequest, JigUpdateDraftDataPath,
JigUpdateDraftDataRequest,
GetJigPlaylistsPath, GetJigPlaylistsResponse, JigAdminDataUpdatePath, JigAdminTransferRequest, JigBrowsePath, JigBrowseQuery, JigBrowseResponse, JigClonePath, JigCountPath, JigCountResponse, JigCoverPath, JigCreatePath, JigCreateRequest, JigDeleteAllPath, JigDeletePath, JigGetDraftPath, JigGetLivePath, JigId, JigLikePath, JigLikedPath, JigLikedResponse, JigPlayPath, JigPublishPath, JigResponse, JigSearchPath, JigSearchQuery, JigSearchResponse, JigTransferAdminPath, JigTrendingPath, JigTrendingResponse, JigUnlikePath, JigUpdateAdminDataRequest, JigUpdateDraftDataPath, JigUpdateDraftDataRequest
},
CreateResponse,
},
Expand Down Expand Up @@ -143,6 +136,16 @@ impl ApiEndpoint for Search {
const METHOD: Method = Method::Get;
}

/// Trending JIGs.
pub struct Trending;
impl ApiEndpoint for Trending {
type Req = ();
type Res = JigTrendingResponse;
type Path = JigTrendingPath;
type Err = EmptyError;
const METHOD: Method = Method::Get;
}

/// Clone a JIG. This clones both the draft and live.
///
/// # Authorization
Expand Down
10 changes: 10 additions & 0 deletions shared/rust/src/domain/jig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,16 @@ pub struct JigSearchResponse {
pub total_jig_count: u64,
}

make_path_parts!(JigTrendingPath => "/v1/jig/trending");

/// Response for request for trending.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct JigTrendingResponse {
/// the jigs returned.
pub jigs: Vec<JigResponse>,
}

/// Response for successfully finding the draft of a jig.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
Expand Down

0 comments on commit 4e85507

Please sign in to comment.