Skip to content

Commit a53c929

Browse files
committed
Change behaviour to use default credentials (#135)
Also fix a bug in stream metadata management
1 parent c416a32 commit a53c929

File tree

6 files changed

+102
-63
lines changed

6 files changed

+102
-63
lines changed

server/src/banner.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ fn status_info(scheme: &str, opt: &option::Opt) {
3737
let url = format!("{}://{}", scheme, opt.address).underlined();
3838
eprintln!(
3939
"
40+
{}
41+
{}
4042
{}",
41-
format!("Parseable server started at {}", url).bold(),
43+
format!("Parseable server started at: {}", url).bold(),
44+
format!("Username: {}", opt.username).bold(),
45+
format!("Password: {}", opt.password).bold(),
4246
)
4347
}
4448

@@ -72,13 +76,55 @@ fn storage_info(opt: &option::Opt) {
7276
}
7377

7478
fn warning(opt: &option::Opt) {
79+
if opt.s3_endpoint_url == option::DEFAULT_S3_URL
80+
&& opt.username == option::DEFAULT_USERNAME
81+
&& opt.password == option::DEFAULT_PASSWORD
82+
{
83+
warning_line();
84+
cred_warning(opt);
85+
s3_warning(opt);
86+
} else if opt.s3_endpoint_url == option::DEFAULT_S3_URL {
87+
warning_line();
88+
s3_warning(opt);
89+
} else if opt.username == option::DEFAULT_USERNAME && opt.password == option::DEFAULT_PASSWORD {
90+
warning_line();
91+
cred_warning(opt);
92+
}
93+
}
94+
95+
fn warning_line() {
96+
eprintln!(
97+
"
98+
{}",
99+
"Warning:".to_string().red().bold(),
100+
);
101+
}
102+
103+
fn cred_warning(opt: &option::Opt) {
104+
if opt.username == option::DEFAULT_USERNAME && opt.password == option::DEFAULT_PASSWORD {
105+
eprintln!(
106+
"
107+
{}
108+
{}",
109+
"Parseable server is using default credentials."
110+
.to_string()
111+
.red(),
112+
format!(
113+
"Setup your credentials with {} and {} before storing production logs.",
114+
option::USERNAME_ENV,
115+
option::PASSOWRD_ENV
116+
)
117+
.red()
118+
)
119+
}
120+
}
121+
122+
fn s3_warning(opt: &option::Opt) {
75123
if opt.s3_endpoint_url == option::DEFAULT_S3_URL {
76124
eprintln!(
77125
"
78-
{}
79126
{}
80127
{}",
81-
"Warning:".to_string().red().bold(),
82128
"Parseable server is using default object storage backend with public access."
83129
.to_string()
84130
.red(),

server/src/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,6 @@ pub enum Error {
6565
StreamLock,
6666
#[error("Metadata not found for log stream: {0}")]
6767
StreamMetaNotFound(String),
68-
#[error("Schema not found for log stream: {0}")]
69-
SchemaNotFound(String),
70-
#[error("Alert config not found for log stream: {0}")]
71-
AlertConfigNotFound(String),
7268
#[error("Invalid alert config: {0}")]
7369
InvalidAlert(String),
7470
}

server/src/handler.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ pub async fn cache_query(_req: HttpRequest, query: web::Json<query::Query>) -> H
8585
}
8686
}
8787

88+
//
89+
// **** Stream related handlers ****
90+
//
8891
pub async fn delete_stream(req: HttpRequest) -> HttpResponse {
8992
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
9093
if let Err(e) = validator::stream_name(&stream_name) {
@@ -137,6 +140,10 @@ pub async fn list_streams(_: HttpRequest) -> impl Responder {
137140
response::list_response(storage::list_streams().unwrap())
138141
}
139142

143+
//
144+
// **** Steam metadata related handlers ****
145+
//
146+
// Get log stream schema
140147
pub async fn get_schema(req: HttpRequest) -> HttpResponse {
141148
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
142149
// fail to proceed if there is an error in log stream name validation
@@ -246,22 +253,24 @@ pub async fn put_stream(req: HttpRequest) -> HttpResponse {
246253
.to_http();
247254
}
248255

249-
return response::ServerResponse {
250-
msg: format!("Created log stream {}", stream_name),
251-
code: StatusCode::OK,
256+
if let Err(e) = metadata::STREAM_INFO.add_stream(
257+
stream_name.to_string(),
258+
"".to_string(),
259+
"".to_string(),
260+
) {
261+
return response::ServerResponse {
262+
msg: format!(
263+
"Failed to add log stream {} to metadata due to err: {}",
264+
stream_name, e
265+
),
266+
code: StatusCode::INTERNAL_SERVER_ERROR,
267+
}
268+
.to_http();
252269
}
253-
.to_http();
254-
}
255270

256-
if let Err(e) =
257-
metadata::STREAM_INFO.add_stream(stream_name.to_string(), "".to_string(), "".to_string())
258-
{
259271
return response::ServerResponse {
260-
msg: format!(
261-
"Failed to add log stream {} to metadata due to err: {}",
262-
stream_name, e
263-
),
264-
code: StatusCode::INTERNAL_SERVER_ERROR,
272+
msg: format!("Created log stream {}", stream_name),
273+
code: StatusCode::OK,
265274
}
266275
.to_http();
267276
}
@@ -324,7 +333,7 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
324333
}
325334

326335
pub async fn post_event(req: HttpRequest, body: web::Json<serde_json::Value>) -> HttpResponse {
327-
let stream_name: String = req.match_info().get("log stream").unwrap().parse().unwrap();
336+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
328337

329338
let labels = collect_labels(&req);
330339

server/src/main.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ async fn validator(
7979
credentials: BasicAuth,
8080
) -> Result<ServiceRequest, actix_web::Error> {
8181
let opt = option::get_opts();
82-
if credentials.user_id().trim() == opt.username.unwrap()
83-
&& credentials.password().unwrap().trim() == opt.password.unwrap()
82+
if credentials.user_id().trim() == opt.username
83+
&& credentials.password().unwrap().trim() == opt.password
8484
{
8585
return Ok(req);
8686
}
@@ -100,27 +100,15 @@ async fn run_http(opt: option::Opt) -> anyhow::Result<()> {
100100
(_, _) => None,
101101
};
102102

103-
if opt.username.is_some() && opt.password.is_some() {
104-
let http_server =
105-
HttpServer::new(move || create_app!().wrap(HttpAuthentication::basic(validator)));
106-
if let Some(builder) = ssl_acceptor {
107-
http_server
108-
.bind_openssl(opt_clone.address, builder)?
109-
.run()
110-
.await?;
111-
} else {
112-
http_server.bind(opt_clone.address)?.run().await?;
113-
}
103+
let http_server =
104+
HttpServer::new(move || create_app!().wrap(HttpAuthentication::basic(validator)));
105+
if let Some(builder) = ssl_acceptor {
106+
http_server
107+
.bind_openssl(opt_clone.address, builder)?
108+
.run()
109+
.await?;
114110
} else {
115-
let http_server = HttpServer::new(move || create_app!());
116-
if let Some(builder) = ssl_acceptor {
117-
http_server
118-
.bind_openssl(opt_clone.address, builder)?
119-
.run()
120-
.await?;
121-
} else {
122-
http_server.bind(opt_clone.address)?.run().await?;
123-
}
111+
http_server.bind(opt_clone.address)?.run().await?;
124112
}
125113

126114
Ok(())

server/src/metadata.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,7 @@ impl STREAM_INFO {
4949
drop(&map);
5050

5151
match meta {
52-
Some(meta) => {
53-
if meta.schema.clone() != "" {
54-
Ok(meta.schema.clone())
55-
} else {
56-
Err(Error::SchemaNotFound(stream_name))
57-
}
58-
}
52+
Some(meta) => Ok(meta.schema.clone()),
5953
None => Err(Error::StreamMetaNotFound(stream_name)),
6054
}
6155
}
@@ -66,13 +60,7 @@ impl STREAM_INFO {
6660
drop(&map);
6761

6862
match meta {
69-
Some(meta) => {
70-
if meta.alert.clone() != "" {
71-
Ok(meta.alert.clone())
72-
} else {
73-
Err(Error::AlertConfigNotFound(stream_name))
74-
}
75-
}
63+
Some(meta) => Ok(meta.alert.clone()),
7664
None => Err(Error::StreamMetaNotFound(stream_name)),
7765
}
7866
}
@@ -139,7 +127,11 @@ impl STREAM_INFO {
139127
alert_config = format!("{:?}", x);
140128
}
141129
}
142-
Err(e) => Err(Error::S3(e))?,
130+
Err(_) => {
131+
// Ignore S3 errors here, because
132+
// we are just trying to load the
133+
// stream metadata based on whatever is available
134+
}
143135
};
144136
match storage::stream_exists(&stream_name) {
145137
Ok(x) => {
@@ -149,7 +141,11 @@ impl STREAM_INFO {
149141
schema = format!("{:?}", x);
150142
}
151143
}
152-
Err(e) => Err(Error::S3(e))?,
144+
Err(_) => {
145+
// Ignore S3 errors here, because
146+
// we are just trying to load the
147+
// stream metadata based on whatever is available
148+
}
153149
};
154150
let metadata = LogStreamMetadata {
155151
schema: schema,

server/src/option.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ use structopt::StructOpt;
2121

2222
pub const DEFAULT_S3_URL: &str = "http://127.0.0.1:9000";
2323
pub const S3_URL_ENV_VAR: &str = "P_S3_URL";
24+
pub const USERNAME_ENV: &str = "P_USERNAME";
25+
pub const PASSOWRD_ENV: &str = "P_PASSWORD";
26+
pub const DEFAULT_USERNAME: &str = "parseable";
27+
pub const DEFAULT_PASSWORD: &str = "parseable";
2428

2529
#[derive(Debug, Clone, StructOpt)]
2630
#[structopt(
@@ -72,12 +76,12 @@ pub struct Opt {
7276
pub s3_bucket_name: String,
7377

7478
/// Optional username to enable basic auth on the server
75-
#[structopt(long, env = "P_USERNAME")]
76-
pub username: Option<String>,
79+
#[structopt(long, env = USERNAME_ENV, default_value = DEFAULT_USERNAME)]
80+
pub username: String,
7781

7882
/// Optional password to enable basic auth on the server
79-
#[structopt(long, env = "P_PASSWORD")]
80-
pub password: Option<String>,
83+
#[structopt(long, env = PASSOWRD_ENV, default_value = DEFAULT_PASSWORD)]
84+
pub password: String,
8185
}
8286

8387
pub fn get_opts() -> Opt {

0 commit comments

Comments
 (0)