diff --git a/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py b/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py index 34cb3f9050fc..307563868fe8 100644 --- a/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py +++ b/sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py @@ -279,6 +279,7 @@ def test_invalid_query_on_non_existent_collection(self): with self.assertRaises(Exception) as context: with TestPipeline() as p: + p.not_use_test_runner_api = True _ = (p | beam.Create(test_chunks) | Enrichment(handler)) expect_err_msg_contains = "collection not found" @@ -307,6 +308,7 @@ def test_invalid_query_on_non_existent_field(self): with self.assertRaises(Exception) as context: with TestPipeline() as p: + p.not_use_test_runner_api = True _ = (p | beam.Create(test_chunks) | Enrichment(handler)) expect_err_msg_contains = f"fieldName({non_existent_field}) not found" @@ -330,6 +332,7 @@ def test_empty_input_chunks(self): expected_chunks = [] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( @@ -458,6 +461,7 @@ def test_filtered_search_with_cosine_similarity_and_batching(self): ] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( @@ -563,6 +567,7 @@ def test_filtered_search_with_bm25_full_text_and_batching(self): ] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( @@ -704,6 +709,7 @@ def test_vector_search_with_euclidean_distance(self): ] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( @@ -844,6 +850,7 @@ def test_vector_search_with_inner_product_similarity(self): ] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( @@ -909,6 +916,7 @@ def test_keyword_search_with_inner_product_sparse_embedding(self): ] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( @@ -982,6 +990,7 @@ def test_hybrid_search(self): ] with TestPipeline() as p: + p.not_use_test_runner_api = True result = (p | beam.Create(test_chunks) | Enrichment(handler)) assert_that( result, lambda actual: MilvusTestHelpers.assert_chunks_equivalent( diff --git a/sdks/python/apache_beam/ml/rag/ingestion/milvus_search_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/milvus_search_it_test.py index b6e5083ea728..641b947374cb 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/milvus_search_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/milvus_search_it_test.py @@ -192,7 +192,6 @@ def tearDownClass(cls): def setUp(self): self.write_test_pipeline = TestPipeline() - self.write_test_pipeline.not_use_test_runner_api = True self._collection_name = f"test_collection_{self._testMethodName}" self._partition_name = f"test_partition_{self._testMethodName}" config = unpack_dataclass_with_kwargs(self._connection_config) @@ -232,6 +231,7 @@ def test_invalid_write_on_non_existent_collection(self): # Write pipeline. with self.assertRaises(Exception) as context: with TestPipeline() as p: + p.not_use_test_runner_api = True _ = (p | beam.Create(test_chunks) | config.create_write_transform()) # Assert on what should happen. @@ -252,6 +252,7 @@ def test_invalid_write_on_non_existent_partition(self): # Write pipeline. with self.assertRaises(Exception) as context: with TestPipeline() as p: + p.not_use_test_runner_api = True _ = (p | beam.Create(test_chunks) | config.create_write_transform()) # Assert on what should happen. @@ -287,6 +288,7 @@ def test_invalid_write_on_missing_primary_key_in_entity(self): # Write pipeline. with self.assertRaises(Exception) as context: with TestPipeline() as p: + p.not_use_test_runner_api = True _ = (p | beam.Create(test_chunks) | config.create_write_transform()) # Assert on what should happen. @@ -332,6 +334,7 @@ def test_write_on_auto_id_primary_key(self): config = MilvusVectorWriterConfig( connection_params=self._connection_config, write_config=write_config) + self.write_test_pipeline.not_use_test_runner_api = True with self.write_test_pipeline as p: _ = (p | beam.Create(test_chunks) | config.create_write_transform()) @@ -357,6 +360,7 @@ def test_write_on_existent_collection_with_default_schema(self): config = MilvusVectorWriterConfig( connection_params=self._connection_config, write_config=write_config) + self.write_test_pipeline.not_use_test_runner_api = True with self.write_test_pipeline as p: _ = (p | beam.Create(test_chunks) | config.create_write_transform()) @@ -422,6 +426,7 @@ def test_write_with_custom_column_specifications(self): write_config=write_config, column_specs=custom_column_specs) + self.write_test_pipeline.not_use_test_runner_api = True with self.write_test_pipeline as p: _ = (p | beam.Create(test_chunks) | config.create_write_transform()) @@ -474,6 +479,7 @@ def test_write_with_batching(self): config = MilvusVectorWriterConfig( connection_params=self._connection_config, write_config=write_config) + self.write_test_pipeline.not_use_test_runner_api = True with self.write_test_pipeline as p: _ = (p | beam.Create(test_chunks) | config.create_write_transform()) diff --git a/sdks/python/apache_beam/ml/rag/test_utils.py b/sdks/python/apache_beam/ml/rag/test_utils.py index f4acb105892c..31022cad5e15 100644 --- a/sdks/python/apache_beam/ml/rag/test_utils.py +++ b/sdks/python/apache_beam/ml/rag/test_utils.py @@ -87,7 +87,8 @@ class CustomMilvusContainer(MilvusContainer): Extends MilvusContainer to provide custom port binding and environment configuration for testing with standalone Milvus instances. """ - def __init__( + + def __init__( # pylint: disable=bad-super-call self, image: str, service_container_port, @@ -96,7 +97,11 @@ def __init__( ) -> None: # Skip the parent class's constructor and go straight to # GenericContainer. - super(MilvusContainer, self).__init__(image=image, **kwargs) + super( + MilvusContainer, + self, + ).__init__( + image=image, **kwargs) self.port = service_container_port self.healthcheck_port = healthcheck_container_port self.with_exposed_ports(service_container_port, healthcheck_container_port) @@ -133,6 +138,27 @@ class MilvusTestHelpers: # https://milvus.io/docs/release_notes.md or PyPI at # https://pypi.org/project/pymilvus/ for version compatibility. # Example: Milvus v2.6.0 requires pymilvus==2.6.0 (exact match required). + @staticmethod + def _wait_for_milvus_grpc(uri: str) -> None: + """Wait until Milvus accepts RPCs. + + Docker may report started before gRPC is ready. + """ + def list_collections_probe(): + client = MilvusClient(uri=uri) + try: + client.list_collections() + finally: + client.close() + + retry_with_backoff( + list_collections_probe, + max_retries=25, + retry_delay=2.0, + retry_backoff_factor=1.2, + operation_name="Milvus client connection after container start", + exception_types=(MilvusException, )) + @staticmethod def start_db_container( image="milvusdb/milvus:v2.5.10", @@ -148,23 +174,31 @@ def start_db_container( if tc_max_retries is not None: testcontainers_config.max_tries = tc_max_retries for i in range(vector_client_max_retries): + vector_db_container: Optional[CustomMilvusContainer] = None try: vector_db_container = CustomMilvusContainer( image=image, service_container_port=service_container_port, healthcheck_container_port=healthcheck_container_port) - vector_db_container = vector_db_container.with_volume_mapping( + mapped_container = vector_db_container.with_volume_mapping( cfg, "/milvus/configs/user.yaml") - vector_db_container.start() - host = vector_db_container.get_container_host_ip() - port = vector_db_container.get_exposed_port(service_container_port) - info = VectorDBContainerInfo(vector_db_container, host, port) + assert mapped_container is not None + running_container: CustomMilvusContainer = mapped_container + vector_db_container = running_container + running_container.start() + host = running_container.get_container_host_ip() + port = running_container.get_exposed_port(service_container_port) + info = VectorDBContainerInfo(running_container, host, port) + MilvusTestHelpers._wait_for_milvus_grpc(info.uri) _LOGGER.info( "milvus db container started successfully on %s.", info.uri) + break except Exception as e: - stdout_logs, stderr_logs = vector_db_container.get_logs() - stdout_logs = stdout_logs.decode("utf-8") - stderr_logs = stderr_logs.decode("utf-8") + stdout_logs = stderr_logs = "" + if vector_db_container is not None: + raw_out, raw_err = vector_db_container.get_logs() + stdout_logs = raw_out.decode("utf-8") + stderr_logs = raw_err.decode("utf-8") _LOGGER.warning( "Retry %d/%d: Failed to start Milvus DB container. Reason: %s. " "STDOUT logs:\n%s\nSTDERR logs:\n%s", diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 534082ddab37..7aad41e0607a 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -719,13 +719,23 @@ def test(self, providers=providers): # default arg to capture loop value **yaml_transform.SafeLineLoader.strip_metadata( fixture.get('config', {})))) for pipeline_spec in spec['pipelines']: - with beam.Pipeline(options=PipelineOptions( - pickle_library='cloudpickle', - **replace_recursive(yaml_transform.SafeLineLoader.strip_metadata( - pipeline_spec.get('options', {})), - vars))) as p: - yaml_transform.expand_pipeline( - p, replace_recursive(pipeline_spec, vars)) + try: + with beam.Pipeline(options=PipelineOptions( + pickle_library='cloudpickle', + **replace_recursive( + yaml_transform.SafeLineLoader.strip_metadata( + pipeline_spec.get('options', {})), + vars))) as p: + yaml_transform.expand_pipeline( + p, replace_recursive(pipeline_spec, vars)) + except ValueError as exn: + # FnApiRunner currently does not support this requirement in + # some xlang scenarios (e.g. Iceberg YAML pipelines). + if 'beam:requirement:pardo:on_window_expiration:v1' in str(exn): + self.skipTest( + 'Runner does not support ' + 'beam:requirement:pardo:on_window_expiration:v1') + raise yield f'test_{suffix}', test diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d364744b211a..d4b70af5e88c 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -162,10 +162,16 @@ def cythonize(*args, **kwargs): milvus_dependency = ['pymilvus>=2.5.10,<3.0.0'] -ml_base = [ +# google-adk / OpenTelemetry require protobuf>=5; tensorflow-transform in +# ml_test is pinned to versions that require protobuf<5 on Python 3.10. Those +# cannot be installed together, so ADK deps stay out of ml_test (use ml_base). +ml_base_core = [ 'embeddings>=0.0.4', # 0.0.3 crashes setuptools - 'google-adk', 'onnxruntime', + # onnx 1.12–1.13 cap protobuf in ways that trigger huge backtracking with + # Beam[gcp]+ml_test; pip can fall back to onnx 1.11 sdist which needs cmake. + # 1.14.1+ matches tf2onnx>=1.16 and ships manylinux wheels for py3.10. + 'onnx>=1.14.1,<2', 'langchain', 'sentence-transformers>=2.2.2', 'skl2onnx', @@ -174,11 +180,24 @@ def cythonize(*args, **kwargs): # tensorflow transitive dep, lower versions not compatible with Python3.10+ 'absl-py>=0.12.0', 'tensorflow-hub', - 'tf2onnx', 'torch', 'transformers', ] +ml_adk_dependency = [ + 'google-adk==1.28.1', + # proto-plus<1.24 caps protobuf<5; opentelemetry-proto (via ADK) needs + # protobuf>=5. Scoped here so the main dependency list stays broader. + 'proto-plus>=1.26.1,<2', + 'opentelemetry-api==1.37.0', + 'opentelemetry-sdk==1.37.0', + 'opentelemetry-exporter-otlp-proto-http==1.37.0', + # protobuf>=5 (ADK/OTel); tf2onnx 1.16.x pins protobuf~=3.20 only. + 'tf2onnx>=1.17.0,<1.18', +] + +ml_base = ml_base_core + ml_adk_dependency + def find_by_ext(root_dir, ext): for root, _, files in os.walk(root_dir): @@ -392,6 +411,9 @@ def get_portability_package_data(): 'packaging>=22.0', 'pillow>=12.1.1,<13', 'pymongo>=3.8.0,<5.0.0', + # ADK / OpenTelemetry need proto-plus>=1.26.1 (protobuf>=5); that + # floor is declared on ml_adk_dependency only so core installs stay + # compatible with older proto-plus. 'proto-plus>=1.7.1,<2', # 1. Use a tighter upper bound in protobuf dependency to make sure # the minor version at job submission @@ -447,9 +469,11 @@ def get_portability_package_data(): 'mock>=1.0.1,<6.0.0', 'pandas<2.3.0', 'parameterized>=0.7.1,<0.10.0', + 'pydot>=1.2.0,<2', 'pyhamcrest>=1.9,!=1.10.0,<3.0.0', 'requests_mock>=1.7,<2.0', - 'tenacity>=8.0.0,<9', + # google-adk 1.28+ requires tenacity>=9,<10 (conflicts with <9). + 'tenacity>=8.0.0,<10', 'pytest>=7.1.2,<9.0', 'pytest-xdist>=2.5.0,<4', 'pytest-timeout>=2.1.0,<3', @@ -547,14 +571,14 @@ def get_portability_package_data(): # TFT->TFX-BSL require pandas 1.x, which is not compatible # with numpy 2.x 'numpy<2', - # To help with dependency resolution in test suite. Revise once - # https://github.com/apache/beam/issues/37854 is fixed - 'protobuf<4; python_version<"3.11"' # Comment out xgboost as it is breaking presubmit python ml # tests due to tag check introduced since pip 24.2 # https://github.com/apache/beam/issues/31285 # 'xgboost<2.0', # https://github.com/apache/beam/issues/31252 - ] + ml_base, + # tft needs protobuf<5; tf2onnx 1.17+ allows protobuf 5 on the + # ADK-only path. + 'tf2onnx>=1.16.1,<1.17', + ] + ml_base_core, 'p310_ml_test': [ 'datatable', ] + ml_base,