Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion betamessageutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ func (acc *BetaMessage) Accumulate(event BetaRawMessageStreamEventUnion) error {
}
case BetaRawMessageStopEvent:
// Re-marshal the accumulated message to update JSON.raw so that AsAny()
// returns the accumulated data rather than the original stream data
// returns the accumulated data rather than the original stream data.
// Sanitize any invalid json.RawMessage fields first — partial JSON
// accumulated from InputJSONDelta events would cause json.Marshal to
// fail (see https://github.com/anthropics/anthropic-sdk-go/issues/255).
for i := range acc.Content {
sanitizeInputJSON(&acc.Content[i].Input)
}
accJSON, err := json.Marshal(acc)
if err != nil {
return fmt.Errorf("error converting accumulated message to JSON: %w", err)
Expand All @@ -82,6 +88,7 @@ func (acc *BetaMessage) Accumulate(event BetaRawMessageStreamEventUnion) error {
return fmt.Errorf("received event of type %s but there was no content block", event.Type)
}
contentBlock := &acc.Content[len(acc.Content)-1]
sanitizeInputJSON(&contentBlock.Input)
cbJSON, err := json.Marshal(contentBlock)
if err != nil {
return fmt.Errorf("error converting content block to JSON: %w", err)
Expand Down
24 changes: 24 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,30 @@ Therefore, the answer is..."}}`,
{Type: "redacted_thinking", Data: "Redacted"},
}},
},
"tool use block with truncated input json": {
events: []string{
`{"type": "message_start", "message": {}}`,
`{"type": "content_block_start", "index": 0, "content_block": {"type": "tool_use", "id": "toolu_id", "name": "tool_name", "input": {}}}`,
`{"type": "content_block_delta", "index": 0, "delta": {"type": "input_json_delta", "partial_json": "{\"argument\":"}}`,
`{"type": "content_block_stop", "index": 0}`,
`{"type": "message_stop"}`,
},
expected: anthropic.Message{Content: []anthropic.ContentBlockUnion{
{Type: "tool_use", ID: "toolu_id", Name: "tool_name", Input: json.RawMessage("null")},
}},
},
"tool use block with malformed input json": {
events: []string{
`{"type": "message_start", "message": {}}`,
`{"type": "content_block_start", "index": 0, "content_block": {"type": "tool_use", "id": "toolu_id", "name": "tool_name", "input": {}}}`,
`{"type": "content_block_delta", "index": 0, "delta": {"type": "input_json_delta", "partial_json": "{\"key s"}}`,
`{"type": "content_block_stop", "index": 0}`,
`{"type": "message_stop"}`,
},
expected: anthropic.Message{Content: []anthropic.ContentBlockUnion{
{Type: "tool_use", ID: "toolu_id", Name: "tool_name", Input: json.RawMessage("null")},
}},
},
"multiple content blocks": {
events: []string{
`{"type": "message_start", "message": {}}`,
Expand Down
24 changes: 23 additions & 1 deletion messageutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ func (acc *Message) Accumulate(event MessageStreamEventUnion) error {
}
case MessageStopEvent:
// Re-marshal the accumulated message to update JSON.raw so that AsAny()
// returns the accumulated data rather than the original stream data
// returns the accumulated data rather than the original stream data.
// Sanitize any invalid json.RawMessage fields first — partial JSON
// accumulated from InputJSONDelta events would cause json.Marshal to
// fail (see https://github.com/anthropics/anthropic-sdk-go/issues/255).
for i := range acc.Content {
sanitizeInputJSON(&acc.Content[i].Input)
}
accJSON, err := json.Marshal(acc)
if err != nil {
return fmt.Errorf("error converting accumulated message to JSON: %w", err)
Expand All @@ -78,6 +84,7 @@ func (acc *Message) Accumulate(event MessageStreamEventUnion) error {
return fmt.Errorf("received event of type %s but there was no content block", event.Type)
}
contentBlock := &acc.Content[len(acc.Content)-1]
sanitizeInputJSON(&contentBlock.Input)
cbJSON, err := json.Marshal(contentBlock)
if err != nil {
return fmt.Errorf("error converting content block to JSON: %w", err)
Expand All @@ -88,6 +95,21 @@ func (acc *Message) Accumulate(event MessageStreamEventUnion) error {
return nil
}

// sanitizeInputJSON replaces the contents of input with null if it
// contains invalid JSON. InputJSONDelta events accumulate partial
// JSON strings into a json.RawMessage by byte concatenation. If the
// stream is interrupted or the API sends malformed chunks, the
// accumulated bytes may not form valid JSON. json.Marshal validates
// json.RawMessage contents via json.Compact, so marshaling a content
// block with invalid Input would fail. Replacing with null allows
// the marshal to succeed — callers can detect the issue by checking
// for a null Input on tool_use blocks.
func sanitizeInputJSON(input *json.RawMessage) {
if len(*input) > 0 && !json.Valid(*input) {
*input = json.RawMessage("null")
}
}

// ToParam converters

func (r Message) ToParam() MessageParam {
Expand Down