Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ walkdir = { version = "2", optional = true }

# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
crc-fast = { version = "1.6" , optional = true }
form_urlencoded = { version = "1.2", optional = true }
http-body-util = { version = "0.1.2", optional = true }
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }
Expand Down Expand Up @@ -79,7 +80,7 @@ cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream
azure = ["cloud", "httparse"]
fs = ["walkdir", "tokio"]
gcp = ["cloud", "rustls-pki-types"]
aws = ["cloud", "md-5"]
aws = ["cloud", "crc-fast", "md-5"]
http = ["cloud"]
tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
integration = ["rand", "tokio"]
Expand Down
5 changes: 5 additions & 0 deletions src/aws/checksum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ use std::str::FromStr;

#[allow(non_camel_case_types)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
/// Enum representing checksum algorithm supported by S3.
pub enum Checksum {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and indeed CRC64NVME seems to be the default suggestion:

https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html

Seems like it was added to the official SDK about a year ago:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to change the enum, perhaps we can add all the other supported checksums as well (we don't have to actually support them, but we can minimize API churn)

I think that would mean adding the following variants (with comments that they aren't yet supported)

CRC32
CRC32C
SHA1
MD5

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb If the enum is marked as #[non_exhaustive] as I recommended above then adding new variants isn't a breaking change. I personally don't think it's a good idea to add variants which aren't actually supported.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking it as #[non_exhaustive] would also be fine

(my rationale to add Variants that are not supported was to 🎣 for help supporting them)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. Changed to #[non-exhaustive]. If this goes through, adding more algorithm variants should be straightforward.

/// SHA-256 algorithm.
SHA256,
/// CRC64-NVME algorithm.
CRC64NVME,
Copy link
Copy Markdown

@orlp orlp Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically a breaking change because this enum was (erroneously IMO) not marked as #[non_exhaustive]. It's up to the maintainer - I personally don't believe anyone is matching on Checksum, it would only break code that currently does such a match.

But @kdn36 you should definitely mark this enum as #[non_exhaustive].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we would have to wait for the next breaking object store release. We could discuss making such a change and quickly release 0.14 (next breaking change) if needed

}

impl std::fmt::Display for Checksum {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Self::SHA256 => write!(f, "sha256"),
Self::CRC64NVME => write!(f, "crc64nvme"),
}
}
}
Expand All @@ -40,6 +44,7 @@ impl FromStr for Checksum {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"sha256" => Ok(Self::SHA256),
"crc64nvme" => Ok(Self::CRC64NVME),
_ => Err(()),
}
}
Expand Down
72 changes: 53 additions & 19 deletions src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use std::sync::Arc;

const VERSION_HEADER: &str = "x-amz-version-id";
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
const CRC64NVME_CHECKSUM: &str = "x-amz-checksum-crc64nvme";
const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
const ALGORITHM: &str = "x-amz-checksum-algorithm";
const STORAGE_CLASS: &str = "x-amz-storage-class";
Expand Down Expand Up @@ -398,19 +399,38 @@ impl Request<'_> {
}

pub(crate) fn with_payload(mut self, payload: PutPayload) -> Self {
if (!self.config.skip_signature && self.config.sign_payload)
|| self.config.checksum.is_some()
{
use std::cell::LazyCell;

let sha256_digest = LazyCell::new(|| {
let mut sha256 = Context::new(&digest::SHA256);
payload.iter().for_each(|x| sha256.update(x));
let payload_sha256 = sha256.finish();
for part in &payload {
sha256.update(part);
}
sha256.finish()
});

if let Some(Checksum::SHA256) = self.config.checksum {
if !self.config.skip_signature && self.config.sign_payload {
self.payload_sha256 = Some(*sha256_digest);
}

match self.config.checksum {
Some(Checksum::SHA256) => {
self.builder = self
.builder
.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(payload_sha256));
.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(*sha256_digest));
}
Some(Checksum::CRC64NVME) => {
let crc_algo = crc_fast::CrcAlgorithm::Crc64Nvme;
let mut digest = crc_fast::Digest::new(crc_algo);
payload.iter().for_each(|x| digest.update(x));
let checksum = digest.finalize();

self.builder = self.builder.header(
CRC64NVME_CHECKSUM,
BASE64_STANDARD.encode(checksum.to_be_bytes()),
)
}
self.payload_sha256 = Some(payload_sha256);
None => {}
}

let content_length = payload.content_length();
Expand Down Expand Up @@ -658,6 +678,9 @@ impl S3Client {
Checksum::SHA256 => {
request = request.header(ALGORITHM, "SHA256");
}
Checksum::CRC64NVME => {
request = request.header(ALGORITHM, "CRC64NVME");
}
}
}
let response = request
Expand Down Expand Up @@ -715,32 +738,43 @@ impl S3Client {
}

let (parts, body) = request.send().await?.into_parts();
let (e_tag, checksum_sha256) = if is_copy {
let (e_tag, checksum_sha256, checksum_crc64nvme) = if is_copy {
let response = body
.bytes()
.await
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;
(response.e_tag, response.checksum_sha256)
(
response.e_tag,
response.checksum_sha256,
response.checksum_crc64nvme,
)
} else {
let e_tag = get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?;
let checksum_sha256 = parts
.headers
.get(SHA256_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
(e_tag, checksum_sha256)
let checksum_crc64nvme = parts
.headers
.get(CRC64NVME_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
(e_tag, checksum_sha256, checksum_crc64nvme)
};

let content_id = if self.config.checksum == Some(Checksum::SHA256) {
let meta = PartMetadata {
e_tag,
checksum_sha256,
};
quick_xml::se::to_string(&meta).unwrap()
} else {
e_tag
let content_id = match self.config.checksum {
Some(_) => {
let meta = PartMetadata {
e_tag,
checksum_sha256,
checksum_crc64nvme,
};
quick_xml::se::to_string(&meta).unwrap()
}
None => e_tag,
};

Ok(PartId { content_id })
Expand Down
138 changes: 93 additions & 45 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,53 +531,94 @@ mod tests {
maybe_skip_integration!();

let bucket = "test-bucket-for-checksum";
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();
for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(checksum)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOptions::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOptions::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");
let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
store.delete(&path).await.unwrap();
}
}

#[tokio::test]
async fn copy_multipart_file_with_signature() {
maybe_skip_integration!();

let bucket = "test-bucket-for-copy-if-not-exists";
for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(checksum)
.with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
.build()
.unwrap();

let src = Path::parse("src.bin").unwrap();
let dst = Path::parse("dst.bin").unwrap();
store
.put(&src, PutPayload::from(vec![0u8; 100_000]))
.await
.unwrap();
if store.head(&dst).await.is_ok() {
store.delete(&dst).await.unwrap();
}
store.copy_if_not_exists(&src, &dst).await.unwrap();
store.delete(&src).await.unwrap();
store.delete(&dst).await.unwrap();
}
}

#[tokio::test]
async fn copy_multipart_file_with_signature_change_checksum() {
maybe_skip_integration!();

let bucket = "test-bucket-for-copy-if-not-exists";
let checksum_src = Checksum::SHA256;
let checksum_dst = Checksum::CRC64NVME;

let src = Path::parse("change_checksum_src.bin").unwrap();
let dst = Path::parse("change_checksum_dst.bin").unwrap();

let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
.with_checksum_algorithm(checksum_src)
.build()
.unwrap();

let src = Path::parse("src.bin").unwrap();
let dst = Path::parse("dst.bin").unwrap();
store
.put(&src, PutPayload::from(vec![0u8; 100_000]))
.await
.unwrap();
if store.head(&dst).await.is_ok() {
store.delete(&dst).await.unwrap();
}

let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(checksum_dst)
.with_copy_if_not_exists(S3CopyIfNotExists::Multipart)
.build()
.unwrap();

store.copy_if_not_exists(&src, &dst).await.unwrap();
store.delete(&src).await.unwrap();
store.delete(&dst).await.unwrap();
Expand All @@ -587,31 +628,33 @@ mod tests {
async fn write_multipart_file_with_signature_object_lock() {
maybe_skip_integration!();

let bucket = "test-object-lock";
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();
for checksum in [Checksum::SHA256, Checksum::CRC64NVME] {
let bucket = "test-object-lock";
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(checksum)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOptions::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();
let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOptions::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");
let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
store.delete(&path).await.unwrap();
}
}

#[tokio::test]
Expand Down Expand Up @@ -670,6 +713,11 @@ mod tests {
let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
let integration = builder.build().unwrap();
put_get_delete_list(&integration).await;

// run integration test with checksum set to crc64nvme
let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::CRC64NVME);
let integration = builder.build().unwrap();
put_get_delete_list(&integration).await;
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) struct CopyPartResult {
pub e_tag: String,
#[serde(default, rename = "ChecksumSHA256")]
pub checksum_sha256: Option<String>,
#[serde(default, rename = "ChecksumCRC64NVME")]
pub checksum_crc64nvme: Option<String>,
}

#[derive(Debug, Serialize)]
Expand All @@ -113,6 +115,8 @@ pub(crate) struct PartMetadata {
pub e_tag: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_crc64nvme: Option<String>,
}

impl From<Vec<PartId>> for CompleteMultipartUpload {
Expand All @@ -127,12 +131,14 @@ impl From<Vec<PartId>> for CompleteMultipartUpload {
Err(_) => PartMetadata {
e_tag: part.content_id.clone(),
checksum_sha256: None,
checksum_crc64nvme: None,
},
};
MultipartPart {
e_tag: md.e_tag,
part_number: part_idx + 1,
checksum_sha256: md.checksum_sha256,
checksum_crc64nvme: md.checksum_crc64nvme,
}
})
.collect();
Expand All @@ -149,6 +155,9 @@ pub(crate) struct MultipartPart {
#[serde(rename = "ChecksumSHA256")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
#[serde(rename = "ChecksumCRC64NVME")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_crc64nvme: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down