Skip to content

Commit 74debd3

Browse files
committed
graph: Warn on log entry parse failures instead of silently skipping
1 parent f186318 commit 74debd3

File tree

3 files changed

+68
-8
lines changed

3 files changed

+68
-8
lines changed

graph/src/components/log_store/elasticsearch.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use async_trait::async_trait;
22
use reqwest::Client;
33
use serde::Deserialize;
44
use serde_json::json;
5+
use slog::{warn, Logger};
56
use std::collections::HashMap;
67
use std::time::Duration;
78

@@ -17,11 +18,13 @@ pub struct ElasticsearchLogStore {
1718
client: Client,
1819
index: String,
1920
timeout: Duration,
21+
logger: Logger,
2022
}
2123

2224
impl ElasticsearchLogStore {
2325
pub fn new(config: ElasticLoggingConfig, index: String, timeout: Duration) -> Self {
2426
Self {
27+
logger: crate::log::logger(false),
2528
endpoint: config.endpoint,
2629
username: config.username,
2730
password: config.password,
@@ -146,8 +149,21 @@ impl ElasticsearchLogStore {
146149
}
147150

148151
fn parse_log_entry(&self, source: ElasticsearchLogDocument) -> Option<LogEntry> {
149-
let level = source.level.parse().ok()?;
150-
let subgraph_id = DeploymentHash::new(&source.subgraph_id).ok()?;
152+
let level = match source.level.parse() {
153+
Ok(l) => l,
154+
Err(_) => {
155+
warn!(self.logger, "Invalid log level in Elasticsearch entry"; "level" => &source.level);
156+
return None;
157+
}
158+
};
159+
160+
let subgraph_id = match DeploymentHash::new(&source.subgraph_id) {
161+
Ok(id) => id,
162+
Err(_) => {
163+
warn!(self.logger, "Invalid subgraph ID in Elasticsearch entry"; "subgraph_id" => &source.subgraph_id);
164+
return None;
165+
}
166+
};
151167

152168
// Convert arguments HashMap to Vec<(String, String)>
153169
let arguments: Vec<(String, String)> = source.arguments.into_iter().collect();

graph/src/components/log_store/file.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use serde::{Deserialize, Serialize};
3+
use slog::{warn, Logger};
34
use std::cmp::Reverse;
45
use std::collections::BinaryHeap;
56
use std::fs::File;
@@ -13,6 +14,7 @@ use super::{LogEntry, LogMeta, LogQuery, LogStore, LogStoreError};
1314
pub struct FileLogStore {
1415
directory: PathBuf,
1516
retention_hours: u32,
17+
logger: Logger,
1618
}
1719

1820
impl FileLogStore {
@@ -24,6 +26,7 @@ impl FileLogStore {
2426
let store = Self {
2527
directory,
2628
retention_hours,
29+
logger: crate::log::logger(false),
2730
};
2831

2932
// Run cleanup on startup for all existing log files
@@ -53,10 +56,29 @@ impl FileLogStore {
5356

5457
/// Parse a JSON line into a LogEntry
5558
fn parse_line(&self, line: &str) -> Option<LogEntry> {
56-
let doc: FileLogDocument = serde_json::from_str(line).ok()?;
59+
let doc: FileLogDocument = match serde_json::from_str(line) {
60+
Ok(doc) => doc,
61+
Err(e) => {
62+
warn!(self.logger, "Failed to parse log line"; "error" => e.to_string());
63+
return None;
64+
}
65+
};
5766

58-
let level = doc.level.parse().ok()?;
59-
let subgraph_id = DeploymentHash::new(&doc.subgraph_id).ok()?;
67+
let level = match doc.level.parse() {
68+
Ok(l) => l,
69+
Err(_) => {
70+
warn!(self.logger, "Invalid log level"; "level" => &doc.level);
71+
return None;
72+
}
73+
};
74+
75+
let subgraph_id = match DeploymentHash::new(&doc.subgraph_id) {
76+
Ok(id) => id,
77+
Err(_) => {
78+
warn!(self.logger, "Invalid subgraph ID"; "subgraph_id" => &doc.subgraph_id);
79+
return None;
80+
}
81+
};
6082

6183
Some(LogEntry {
6284
id: doc.id,

graph/src/components/log_store/loki.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use async_trait::async_trait;
22
use reqwest::Client;
33
use serde::Deserialize;
4+
use slog::{warn, Logger};
45
use std::collections::HashMap;
56
use std::time::Duration;
67

@@ -14,6 +15,7 @@ pub struct LokiLogStore {
1415
username: Option<String>,
1516
password: Option<String>,
1617
client: Client,
18+
logger: Logger,
1719
}
1820

1921
impl LokiLogStore {
@@ -34,6 +36,7 @@ impl LokiLogStore {
3436
username,
3537
password,
3638
client,
39+
logger: crate::log::logger(false),
3740
})
3841
}
3942

@@ -146,10 +149,29 @@ impl LokiLogStore {
146149
) -> Option<LogEntry> {
147150
// value is [timestamp_ns, log_line]
148151
// We expect the log line to be JSON with our log entry structure
149-
let log_data: LokiLogDocument = serde_json::from_str(&value.1).ok()?;
152+
let log_data: LokiLogDocument = match serde_json::from_str(&value.1) {
153+
Ok(doc) => doc,
154+
Err(e) => {
155+
warn!(self.logger, "Failed to parse Loki log entry"; "error" => e.to_string());
156+
return None;
157+
}
158+
};
159+
160+
let level = match log_data.level.parse() {
161+
Ok(l) => l,
162+
Err(_) => {
163+
warn!(self.logger, "Invalid log level in Loki entry"; "level" => &log_data.level);
164+
return None;
165+
}
166+
};
150167

151-
let level = log_data.level.parse().ok()?;
152-
let subgraph_id = DeploymentHash::new(&log_data.subgraph_id).ok()?;
168+
let subgraph_id = match DeploymentHash::new(&log_data.subgraph_id) {
169+
Ok(id) => id,
170+
Err(_) => {
171+
warn!(self.logger, "Invalid subgraph ID in Loki entry"; "subgraph_id" => &log_data.subgraph_id);
172+
return None;
173+
}
174+
};
153175

154176
Some(LogEntry {
155177
id: log_data.id,

0 commit comments

Comments
 (0)