-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-20391: Improve types returned by the Connect REST API #22001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,13 +184,13 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin | |
| @GET | ||
| @Path("/{connector}/topics") | ||
| @Operation(summary = "Get the list of topics actively used by the specified connector") | ||
| public Response getConnectorActiveTopics(final @PathParam("connector") String connector) { | ||
| public Map<String, ActiveTopicsInfo> getConnectorActiveTopics(final @PathParam("connector") String connector) { | ||
| if (isTopicTrackingDisabled) { | ||
| throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), | ||
| "Topic tracking is disabled."); | ||
| } | ||
| ActiveTopicsInfo info = herder.connectorActiveTopics(connector); | ||
| return Response.ok(Map.of(info.connector(), info)).build(); | ||
| return Map.of(info.connector(), info); | ||
| } | ||
|
|
||
| @PUT | ||
|
|
@@ -234,15 +234,15 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto | |
|
|
||
| @PATCH | ||
| @Path("/{connector}/config") | ||
| public Response patchConnectorConfig(final @PathParam("connector") String connector, | ||
| public ConnectorInfo patchConnectorConfig(final @PathParam("connector") String connector, | ||
| final @Context HttpHeaders headers, | ||
| final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, | ||
| final Map<String, String> connectorConfigPatch) throws Throwable { | ||
| FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); | ||
| herder.patchConnectorConfig(connector, connectorConfigPatch, cb); | ||
| Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", | ||
| "PATCH", headers, connectorConfigPatch, new TypeReference<>() { }, new CreatedConnectorInfoTranslator(), forward); | ||
| return Response.ok().entity(createdInfo.result()).build(); | ||
| return createdInfo.result(); | ||
| } | ||
|
|
||
| @POST | ||
|
|
@@ -359,7 +359,7 @@ public ConnectorOffsets getOffsets(final @PathParam("connector") String connecto | |
| @PATCH | ||
| @Path("/{connector}/offsets") | ||
| @Operation(summary = "Alter the offsets for the specified connector") | ||
| public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, | ||
| public Message alterConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, | ||
| final @Context HttpHeaders headers, final @PathParam("connector") String connector, | ||
| final ConnectorOffsets offsets) throws Throwable { | ||
| if (offsets.offsets() == null || offsets.offsets().isEmpty()) { | ||
|
|
@@ -368,21 +368,19 @@ public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryPara | |
|
|
||
| FutureCallback<Message> cb = new FutureCallback<>(); | ||
| herder.alterConnectorOffsets(connector, offsets.toMap(), cb); | ||
| Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "PATCH", headers, offsets, | ||
| return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "PATCH", headers, offsets, | ||
| new TypeReference<>() { }, new IdentityTranslator<>(), forward); | ||
| return Response.ok().entity(msg).build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this return we are missing the status code now. Aren't these breaking changes and need a kip ? |
||
| } | ||
|
|
||
| @DELETE | ||
| @Path("/{connector}/offsets") | ||
| @Operation(summary = "Reset the offsets for the specified connector") | ||
| public Response resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, | ||
| public Message resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, | ||
| final @Context HttpHeaders headers, final @PathParam("connector") String connector) throws Throwable { | ||
| FutureCallback<Message> cb = new FutureCallback<>(); | ||
| herder.resetConnectorOffsets(connector, cb); | ||
| Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null, | ||
| return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null, | ||
| new TypeReference<>() { }, new IdentityTranslator<>(), forward); | ||
| return Response.ok().entity(msg).build(); | ||
| } | ||
|
|
||
| // Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,7 +40,6 @@ | |
| import jakarta.ws.rs.Produces; | ||
| import jakarta.ws.rs.QueryParam; | ||
| import jakarta.ws.rs.core.MediaType; | ||
| import jakarta.ws.rs.core.Response; | ||
|
|
||
| /** | ||
| * A set of endpoints to adjust the log levels of runtime loggers. | ||
|
|
@@ -69,8 +68,8 @@ public LoggingResource(Herder herder) { | |
| */ | ||
| @GET | ||
| @Operation(summary = "List the current loggers that have their levels explicitly set and their log levels") | ||
| public Response listLoggers() { | ||
| return Response.ok(herder.allLoggerLevels()).build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it ok to loose the 'ok' status code ? |
||
| public Map<String, LoggerLevel> listLoggers() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, could be better if an entity (similar to what we have in org/apache/kafka/connect/runtime/rest/entities) is returned instead of a Map |
||
| return herder.allLoggerLevels(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -82,14 +81,14 @@ public Response listLoggers() { | |
| @GET | ||
| @Path("/{logger}") | ||
| @Operation(summary = "Get the log level for the specified logger") | ||
| public Response getLogger(final @PathParam("logger") String namedLogger) { | ||
| public LoggerLevel getLogger(final @PathParam("logger") String namedLogger) { | ||
| Objects.requireNonNull(namedLogger, "require non-null name"); | ||
|
|
||
| LoggerLevel loggerLevel = herder.loggerLevel(namedLogger); | ||
| if (loggerLevel == null) | ||
| throw new NotFoundException("Logger " + namedLogger + " not found."); | ||
|
|
||
| return Response.ok(loggerLevel).build(); | ||
| return loggerLevel; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -104,7 +103,7 @@ public Response getLogger(final @PathParam("logger") String namedLogger) { | |
| @Path("/{logger}") | ||
| @Operation(summary = "Set the log level for the specified logger") | ||
| @SuppressWarnings("fallthrough") | ||
| public Response setLevel(final @PathParam("logger") String namespace, | ||
| public List<String> setLevel(final @PathParam("logger") String namespace, | ||
| final Map<String, String> levelMap, | ||
| @DefaultValue("worker") @QueryParam("scope") @Parameter(description = "The scope for the logging modification (single-worker, cluster-wide, etc.)") String scope) { | ||
| if (scope == null) { | ||
|
|
@@ -126,11 +125,10 @@ public Response setLevel(final @PathParam("logger") String namespace, | |
| default: | ||
| log.warn("Received invalid scope '{}' in request to adjust logging level; will default to {}", scope, WORKER_SCOPE); | ||
| case WORKER_SCOPE: | ||
| List<String> affectedLoggers = herder.setWorkerLoggerLevel(namespace, levelString); | ||
| return Response.ok(affectedLoggers).build(); | ||
| return herder.setWorkerLoggerLevel(namespace, levelString); | ||
| case CLUSTER_SCOPE: | ||
| herder.setClusterLoggerLevel(namespace, levelString); | ||
| return Response.noContent().build(); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To align with other endpoints, it is better to create an entity instead of Map