Skip to content

Commit

Permalink
Keep-alive management, no client request timeout + technical debt
Browse files Browse the repository at this point in the history
fix(logs): add X-Request-ID to some error responses 
feat(ci): consul version update
feat(handler): configurable keep-alive, no client timeout
  • Loading branch information
bmuddha authored Apr 5, 2022
1 parent 8488b5a commit 3fdb7c7
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 191 deletions.
110 changes: 36 additions & 74 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ variables:
CARGO_HOME: /cache/cacherpc/.cargo
CARGO_TARGET_DIR: "/cache/cacherpc/$CI_COMMIT_REF_SLUG/$CI_JOB_NAME"
CARGO_TARGET_DIR_MASTER: "/cache/cacherpc/master/$CI_JOB_NAME"
STAND: tt_dev

stages:
- pre_build
Expand All @@ -32,7 +33,7 @@ include:
file: 'vars/upver-tag-naming.yml'
- project: 'shared/tasks'
ref: master
file: 'notify/up-version.yml'
file: 'notify/up-version-consul.yml'
- project: 'shared/tasks'
ref: master
file: 'notify/send-failed.yml'
Expand All @@ -48,6 +49,12 @@ include:
- project: 'shared/tasks'
ref: 'master'
file: 'upload/github_binary.yml'
- project: 'shared/tasks'
ref: 'master'
file: 'docker/docker-rust.yml' ### build
- project: 'shared/tasks'
ref: 'master'
file: 'deploy/with-ansible-dev.yml' ### deploy_dev

create_build_image:
stage: pre_build
Expand Down Expand Up @@ -126,85 +133,40 @@ build_artifact:
- ./${UPLOAD_ARTIFACT}
expire_in: 1 day

build_docker:
stage: build
tags: [docker_19_cache]
before_script:
- docker login -u $REGISTRY_LOGIN -p $REGISTRY_PASS
- apk update && apk add git make curl
script:
- PIPELINE_ID_VERSION=$(curl --user "$REPO_LOGIN:$REPO_PASS" https://$REPO_SRV/repository/$ARTIFACTS_PATH/$APPNAME_SHORT/pipelin-id/$CI_PIPELINE_ID/$VERSION_FILE)
- tar -xzf rpm-source.tar.gz
- >
docker build
--build-arg BIN_NAME=${BIN_NAME}
--tag $REGISTRY_PROJECT:$PIPELINE_ID_VERSION
--tag $REGISTRY_PROJECT:latest
-f Dockerfile.dist
.
- docker push $REGISTRY_PROJECT:$PIPELINE_ID_VERSION
- docker push $REGISTRY_PROJECT:latest
# only:
# - tags
# - develop
# - master
# - web

deploy_dev_aws:
stage: deploy_dev
image: gableroux/ansible:2.10.7
tags: [docker19_hetz_nocache]
before_script:
- eval $(ssh-agent -s)
- mkdir ~/.ssh/
- chmod 700 ~/.ssh
- '[[ -f /.dockerenv ]] && echo -e "Host *\n\tStrictHostKeyChecking no\n\n" > ~/.ssh/config'
- echo "$SSH_KNOWN_HOSTS" > ~/.ssh/known_hosts
- chmod 644 ~/.ssh/known_hosts
- echo "$DEPLOY_SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add - > /dev/null
- apk update && apk add git make curl
script:
- PIPELINE_ID_VERSION=$(curl --user "$REPO_LOGIN:$REPO_PASS" https://$REPO_SRV/repository/$ARTIFACTS_PATH/$APPNAME_SHORT/pipelin-id/$CI_PIPELINE_ID/$VERSION_FILE)
- git clone https://git.zubr.dev/shared/ci-scripts.git
- cd ci-scripts/deploy/cacherpc
- ansible --version
- ansible-playbook -i tt_stg -e "install_version=${PIPELINE_ID_VERSION}" --private-key ~/.ssh/id_rsa deploy.yml
only:
- develop
- master
- tags
- web
deploy_dev:
tags: [tt_deploy_dev]
variables:
LIMIT_HOST: "ad-pub-cacher-13.tt-int.net,ad-pub-cacher-15.tt-int.net"
REMOTE_USER: "ci_ssh_solana_cacherpc"
APP_PATH: $BIN_PATH
APP_USER: sol

deploy_dev_aws_cacher_13:
extends: .deploy_dev_aws_not_master
deploy_dev_13_not_master:
extends: .deploy_dev
tags: [tt_deploy_dev]
variables:
LIMIT_HOST: "ad-pub-cacher-13.tt-int.net"
REMOTE_USER: "ci_ssh_solana_cacherpc"
APP_PATH: $BIN_PATH
APP_USER: sol
only:
when: manual
except:
- master
- tags

deploy_dev_aws_cacher_15:
extends: .deploy_dev_aws_not_master
deploy_dev_15_not_master:
extends: .deploy_dev
tags: [tt_deploy_dev]
variables:
LIMIT_HOST: "ad-pub-cacher-15.tt-int.net"

.deploy_dev_aws_not_master:
stage: deploy_dev
image: gableroux/ansible:2.10.7
tags: [docker19_hetz_nocache]
before_script:
- eval $(ssh-agent -s)
- mkdir ~/.ssh/
- chmod 700 ~/.ssh
- '[[ -f /.dockerenv ]] && echo -e "Host *\n\tStrictHostKeyChecking no\n\n" > ~/.ssh/config'
- echo "$SSH_KNOWN_HOSTS" > ~/.ssh/known_hosts
- chmod 644 ~/.ssh/known_hosts
- echo "$DEPLOY_SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add - > /dev/null
- apk update && apk add git make curl
script:
- PIPELINE_ID_VERSION=$(curl --user "$REPO_LOGIN:$REPO_PASS" https://$REPO_SRV/repository/$ARTIFACTS_PATH/$APPNAME_SHORT/pipelin-id/$CI_PIPELINE_ID/$VERSION_FILE)
- git clone https://git.zubr.dev/shared/ci-scripts.git
- cd ci-scripts/deploy/cacherpc
- ansible --version
- ansible-playbook -i tt_stg -e "install_version=${PIPELINE_ID_VERSION}" --limit ${LIMIT_HOST} --private-key ~/.ssh/id_rsa deploy.yml
REMOTE_USER: "ci_ssh_solana_cacherpc"
APP_PATH: $BIN_PATH
APP_USER: sol
only:
when: manual
except:
- master
- tags
- tags


2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cache-rpc"
version = "0.2.14"
version = "0.2.15"
authors = ["Alexander Polakov <[email protected]>"]
edition = "2018"
license = "MIT"
Expand Down
7 changes: 7 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ pub struct Options {
parse(try_from_str = parse_identity)
)]
pub identity: Option<String>,
#[structopt(
long = "keep-alive",
help = "time duration, after which connection to server will be aborted, if no data is received over it",
default_value = "30s",
parse(try_from_str = humantime::parse_duration)
)]
pub keep_alive: Duration,
}

#[derive(Debug)]
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ pub mod types;

const DEFAULT_GAI_QUEUE_SIZE: usize = 2 << 19;
const DEFAULT_GPA_QUEUE_SIZE: usize = 2 << 18;
const DEFAULT_GAI_TIMEOUT: u64 = 30;
const DEFAULT_GPA_TIMEOUT: u64 = 60;
const DEFAULT_GAI_BACKOFF: u64 = 30;
const DEFAULT_GPA_BACKOFF: u64 = 60;
const PASSTHROUGH_BACKOFF: u64 = 30;

pub const CACHER_LOG_LEVEL: &str = "CACHER_LOG_LEVEL";
29 changes: 27 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::http::header;
use cache_rpc::control::{handle_command, run_control_interface, ControlState, RpcConfigSender};
use cache_rpc::CACHER_LOG_LEVEL;
use either::Either;
use std::cell::RefCell;
use std::path::PathBuf;
Expand Down Expand Up @@ -50,26 +51,43 @@ async fn main() -> Result<()> {
};
let (writer, _guard) = tracing_appender::non_blocking(writer);

let level = std::env::var(CACHER_LOG_LEVEL)
.ok()
.map(|level| {
use tracing::Level;
match level.to_lowercase().as_str() {
"debug" => Level::DEBUG,
"trace" => Level::TRACE,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
}
})
.unwrap_or(tracing::Level::INFO);
match options.log_format {
cli::LogFormat::Json => {
let subscriber = fmt::Subscriber::builder()
.with_thread_names(true)
.with_writer(writer)
.with_max_level(level)
.with_timer(fmt::time::ChronoLocal::rfc3339())
.json()
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
cli::LogFormat::Plain => {
let subscriber = fmt::Subscriber::builder().with_writer(writer).finish();
let subscriber = fmt::Subscriber::builder()
.with_max_level(level)
.with_writer(writer)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
};

let span = tracing::span!(tracing::Level::INFO, "global", version = %metrics::version());
let _enter = span.enter();

info!(?options, "configuration options");
info!(?options, "started cache-rpc with configuration");

run(options).await?;
Ok(())
Expand Down Expand Up @@ -248,6 +266,13 @@ async fn run(options: cli::Options) -> Result<()> {
)
.service(web::resource("/metrics").route(web::get().to(rpc::metrics_handler)))
})
.on_connect(|c, _| {
let stream = c.downcast_ref::<actix_web::rt::net::TcpStream>().unwrap();
let addr = stream.peer_addr().unwrap();
tracing::debug!(ip = %addr.ip(), port = %addr.port(), "got incomming connection");
})
.keep_alive(options.keep_alive)
.client_request_timeout(Duration::ZERO)
.bind(bind_addr)
.with_context(|| format!("failed to bind to {}", bind_addr))?
.run()
Expand Down
36 changes: 26 additions & 10 deletions src/pubsub/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,18 @@ impl AccountUpdateManager {
.send(awc::ws::Message::Ping(b"hello?".as_ref().into()))
.is_err()
{
warn!(actor_id = actor.actor_id, "failed to send ping");
warn!(
actor = actor.actor_id,
"failed to send websocket healthcheck ping, will terminate actor"
);
ctx.stop();
}

let elapsed = actor.last_received_at.elapsed();
if elapsed > WEBSOCKET_PING_TIMEOUT {
warn!(
actor_id = actor.actor_id,
"no messages received in {:?}, assume connection lost ({:?})",
actor=actor.actor_id,
"no messages received in {:?}, assume connection lost ({:?}), terminating actor",
elapsed,
actor.last_received_at
);
Expand Down Expand Up @@ -226,7 +229,7 @@ impl AccountUpdateManager {
.set(actor.inflight.len() as i64);
if dead_requests > DEAD_REQUEST_LIMIT {
warn!(
message = "too many dead requests, disconnecting",
message = "too many dead requests, disconnecting from websocket server",
count = dead_requests
);
ctx.stop();
Expand All @@ -248,7 +251,10 @@ impl AccountUpdateManager {
.map(|diff| diff > self.config.slot_distance as u64)
.unwrap_or(false);
if behind {
error!("websocket slot behind rpc, stopping");
error!(
actor=%self.actor_id,
"websocket slot behind rpc, terminating websocket connection"
);
ctx.stop()
}
}
Expand Down Expand Up @@ -334,11 +340,14 @@ impl AccountUpdateManager {
.write(actixws::Message::Ping(b"check connection".as_ref().into()))
.is_err()
{
error!(actor_id, "failed to send check msg");
error!(
actor = actor_id,
"failed to send healthcheck message to websocket server, terminating actor"
);
ctx.stop();
return;
};
info!(actor_id, message = "websocket ping sent");
debug!(actor = actor_id, "websocket ping sent");

ctx.add_stream(stream);

Expand All @@ -351,15 +360,21 @@ impl AccountUpdateManager {
}
}

info!(actor_id, message = "websocket stream added");
info!(
actor = actor_id,
"websocket data stream has been added to actor"
);
metrics()
.websocket_connected
.with_label_values(&[&actor.actor_name])
.set(1);
actor.last_received_at = Instant::now();
});
ctx.wait(fut);
info!(self.actor_id, message = "connection future complete");
debug!(
actor = self.actor_id,
message = "websocket connection future complete"
);
self.update_status();
}

Expand Down Expand Up @@ -520,7 +535,7 @@ impl AccountUpdateManager {
ctx.cancel_future(handle);
}
None => {
debug!(key = %sub.key(), "filter added");
debug!(pubkey=%sub.key(), "new filter added for program");
metrics()
.filters
.with_label_values(&[&self.actor_name])
Expand Down Expand Up @@ -687,6 +702,7 @@ impl Handler<ForceReconnect> for AccountUpdateManager {

fn handle(&mut self, _: ForceReconnect, ctx: &mut Self::Context) -> Self::Result {
// will restart actor, and reestablish connection along with resetting cache
tracing::warn!(actor=%self.actor_id, "forcing websocket reconnection on actor");
ctx.stop();
}
}
Expand Down
Loading

0 comments on commit 3fdb7c7

Please sign in to comment.