Skip to content

Commit

Permalink
[ISSUES nacos-group#180]fix: user not found
Browse files Browse the repository at this point in the history
  • Loading branch information
onewe committed Jul 22, 2023
1 parent 34124e2 commit 868614e
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions src/common/remote/grpc/layers/auth.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll, time::Duration};
use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll, thread, time::Duration};

use async_stream::stream;
use futures::Future;
use tokio::time::sleep;
use tokio::{sync::oneshot, time::sleep};
use tower::{Layer, Service};
use tracing::{debug, debug_span, Instrument};
use tracing::{debug, debug_span, info, Instrument};

use crate::{
api::{
Expand All @@ -31,31 +31,44 @@ impl AuthLayer {
auth_params: HashMap<String, String>,
id: String,
) -> Self {
AuthLayer::login_task(auth_plugin.clone(), server_list, auth_params, id);
Self::init(auth_plugin.clone(), server_list, auth_params, id);
Self { auth_plugin }
}

fn login_task(
fn init(
auth_plugin: Arc<dyn AuthPlugin>,
server_list: Vec<String>,
auth_params: HashMap<String, String>,
id: String,
) {
let _span_enter = debug_span!("auth_task", id = id).entered();
let auth_context = AuthContext::default().add_params(auth_params);
// auth_plugin.login(server_list.clone(), auth_context.clone());
let (tx, rx) = oneshot::channel::<()>();
executor::spawn(
async move {
info!("init auth task");
let auth_context = AuthContext::default().add_params(auth_params);
auth_plugin
.login(server_list.clone(), auth_context.clone())
.in_current_span()
.await;
info!("init auth finish");
let _ = tx.send(());

info!("auth plugin task start.");
loop {
auth_plugin
.login(server_list.clone(), auth_context.clone())
.in_current_span()
.await;
debug!("auth_plugin schedule at fixed delay");
sleep(Duration::from_secs(30)).await;
}
}
.in_current_span(),
.instrument(debug_span!("auth_task", id = id)),
);

let wait_ret = thread::spawn(move || rx.blocking_recv());

let _ = wait_ret.join().unwrap();
}
}

Expand All @@ -79,8 +92,7 @@ impl Service<Payload> for AuthUnaryCallService {
}

fn call(&mut self, mut req: Payload) -> Self::Future {
let login_identity = self.auth_plugin.get_login_identity();
let contexts = login_identity.contexts;
let contexts = self.auth_plugin.get_login_identity().contexts;

let metadata = req.metadata.take();
let metadata = if let Some(mut metadata) = metadata {
Expand Down Expand Up @@ -123,8 +135,7 @@ impl Service<GrpcStream<Payload>> for AuthBiStreamingCallService {
let auth_plugin = self.auth_plugin.clone();
let stream = stream! {
for await mut value in req {
let login_identity = auth_plugin.get_login_identity();
let contexts = login_identity.contexts;
let contexts = auth_plugin.get_login_identity().contexts;

let metadata = value.metadata.take();
let metadata = if let Some(mut metadata) = metadata {
Expand Down

0 comments on commit 868614e

Please sign in to comment.