From 5f0117098ff66449d64d7f172374231845c871d9 Mon Sep 17 00:00:00 2001 From: Jamu Kakar Date: Mon, 30 Mar 2026 15:35:56 -0700 Subject: [PATCH] fix(accumulator): sanitize invalid JSON in Input before marshaling When InputJSONDelta events are accumulated into a content block's Input field (a json.RawMessage), the result may contain truncated or malformed JSON if the stream is interrupted or the API sends incomplete chunks. The subsequent json.Marshal call at ContentBlockStopEvent and MessageStopEvent fails because json.Compact validates the raw bytes. Add a sanitizeInputJSON helper that checks json.Valid before marshaling and replaces invalid Input with null. This allows Accumulate to succeed gracefully -- callers can detect the issue by checking for a null Input on tool_use blocks. Fixes #255 Co-Authored-By: Claude Opus 4.6 (1M context) --- betamessageutil.go | 9 ++++++++- message_test.go | 24 ++++++++++++++++++++++++ messageutil.go | 24 +++++++++++++++++++++++- 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/betamessageutil.go b/betamessageutil.go index 1348558b..c56665a7 100644 --- a/betamessageutil.go +++ b/betamessageutil.go @@ -69,7 +69,13 @@ func (acc *BetaMessage) Accumulate(event BetaRawMessageStreamEventUnion) error { } case BetaRawMessageStopEvent: // Re-marshal the accumulated message to update JSON.raw so that AsAny() - // returns the accumulated data rather than the original stream data + // returns the accumulated data rather than the original stream data. + // Sanitize any invalid json.RawMessage fields first — partial JSON + // accumulated from InputJSONDelta events would cause json.Marshal to + // fail (see https://github.com/anthropics/anthropic-sdk-go/issues/255). + for i := range acc.Content { + sanitizeInputJSON(&acc.Content[i].Input) + } accJSON, err := json.Marshal(acc) if err != nil { return fmt.Errorf("error converting accumulated message to JSON: %w", err) @@ -82,6 +88,7 @@ func (acc *BetaMessage) Accumulate(event BetaRawMessageStreamEventUnion) error { return fmt.Errorf("received event of type %s but there was no content block", event.Type) } contentBlock := &acc.Content[len(acc.Content)-1] + sanitizeInputJSON(&contentBlock.Input) cbJSON, err := json.Marshal(contentBlock) if err != nil { return fmt.Errorf("error converting content block to JSON: %w", err) diff --git a/message_test.go b/message_test.go index 288b1352..a28b8bfe 100644 --- a/message_test.go +++ b/message_test.go @@ -348,6 +348,30 @@ Therefore, the answer is..."}}`, {Type: "redacted_thinking", Data: "Redacted"}, }}, }, + "tool use block with truncated input json": { + events: []string{ + `{"type": "message_start", "message": {}}`, + `{"type": "content_block_start", "index": 0, "content_block": {"type": "tool_use", "id": "toolu_id", "name": "tool_name", "input": {}}}`, + `{"type": "content_block_delta", "index": 0, "delta": {"type": "input_json_delta", "partial_json": "{\"argument\":"}}`, + `{"type": "content_block_stop", "index": 0}`, + `{"type": "message_stop"}`, + }, + expected: anthropic.Message{Content: []anthropic.ContentBlockUnion{ + {Type: "tool_use", ID: "toolu_id", Name: "tool_name", Input: json.RawMessage("null")}, + }}, + }, + "tool use block with malformed input json": { + events: []string{ + `{"type": "message_start", "message": {}}`, + `{"type": "content_block_start", "index": 0, "content_block": {"type": "tool_use", "id": "toolu_id", "name": "tool_name", "input": {}}}`, + `{"type": "content_block_delta", "index": 0, "delta": {"type": "input_json_delta", "partial_json": "{\"key s"}}`, + `{"type": "content_block_stop", "index": 0}`, + `{"type": "message_stop"}`, + }, + expected: anthropic.Message{Content: []anthropic.ContentBlockUnion{ + {Type: "tool_use", ID: "toolu_id", Name: "tool_name", Input: json.RawMessage("null")}, + }}, + }, "multiple content blocks": { events: []string{ `{"type": "message_start", "message": {}}`, diff --git a/messageutil.go b/messageutil.go index e735e479..475a49d8 100644 --- a/messageutil.go +++ b/messageutil.go @@ -65,7 +65,13 @@ func (acc *Message) Accumulate(event MessageStreamEventUnion) error { } case MessageStopEvent: // Re-marshal the accumulated message to update JSON.raw so that AsAny() - // returns the accumulated data rather than the original stream data + // returns the accumulated data rather than the original stream data. + // Sanitize any invalid json.RawMessage fields first — partial JSON + // accumulated from InputJSONDelta events would cause json.Marshal to + // fail (see https://github.com/anthropics/anthropic-sdk-go/issues/255). + for i := range acc.Content { + sanitizeInputJSON(&acc.Content[i].Input) + } accJSON, err := json.Marshal(acc) if err != nil { return fmt.Errorf("error converting accumulated message to JSON: %w", err) @@ -78,6 +84,7 @@ func (acc *Message) Accumulate(event MessageStreamEventUnion) error { return fmt.Errorf("received event of type %s but there was no content block", event.Type) } contentBlock := &acc.Content[len(acc.Content)-1] + sanitizeInputJSON(&contentBlock.Input) cbJSON, err := json.Marshal(contentBlock) if err != nil { return fmt.Errorf("error converting content block to JSON: %w", err) @@ -88,6 +95,21 @@ func (acc *Message) Accumulate(event MessageStreamEventUnion) error { return nil } +// sanitizeInputJSON replaces the contents of input with null if it +// contains invalid JSON. InputJSONDelta events accumulate partial +// JSON strings into a json.RawMessage by byte concatenation. If the +// stream is interrupted or the API sends malformed chunks, the +// accumulated bytes may not form valid JSON. json.Marshal validates +// json.RawMessage contents via json.Compact, so marshaling a content +// block with invalid Input would fail. Replacing with null allows +// the marshal to succeed — callers can detect the issue by checking +// for a null Input on tool_use blocks. +func sanitizeInputJSON(input *json.RawMessage) { + if len(*input) > 0 && !json.Valid(*input) { + *input = json.RawMessage("null") + } +} + // ToParam converters func (r Message) ToParam() MessageParam {