langfuse.openai
If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import.
- import openai
+ from langfuse.openai import openai
Langfuse automatically tracks:
- All prompts/completions with support for streaming, async and functions
- Latencies
- API Errors
- Model usage (tokens) and cost (USD)
The integration is fully interoperable with the observe()
decorator and the low-level tracing SDK.
See docs for more details: https://langfuse.com/docs/integrations/openai
1"""If you use the OpenAI Python SDK, you can use the Langfuse drop-in replacement to get full logging by changing only the import. 2 3```diff 4- import openai 5+ from langfuse.openai import openai 6``` 7 8Langfuse automatically tracks: 9 10- All prompts/completions with support for streaming, async and functions 11- Latencies 12- API Errors 13- Model usage (tokens) and cost (USD) 14 15The integration is fully interoperable with the `observe()` decorator and the low-level tracing SDK. 16 17See docs for more details: https://langfuse.com/docs/integrations/openai 18""" 19 20import logging 21import types 22from collections import defaultdict 23from dataclasses import dataclass 24from inspect import isclass 25from typing import Optional 26 27from openai._types import NotGiven 28from packaging.version import Version 29from pydantic import BaseModel 30from wrapt import wrap_function_wrapper 31 32from langfuse import Langfuse 33from langfuse.client import StatefulGenerationClient 34from langfuse.decorators import langfuse_context 35from langfuse.media import LangfuseMedia 36from langfuse.utils import _get_timestamp 37from langfuse.utils.langfuse_singleton import LangfuseSingleton 38 39try: 40 import openai 41except ImportError: 42 raise ModuleNotFoundError( 43 "Please install OpenAI to use this feature: 'pip install openai'" 44 ) 45 46try: 47 from openai import AsyncAzureOpenAI, AsyncOpenAI, AzureOpenAI, OpenAI # noqa: F401 48except ImportError: 49 AsyncAzureOpenAI = None 50 AsyncOpenAI = None 51 AzureOpenAI = None 52 OpenAI = None 53 54log = logging.getLogger("langfuse") 55 56 57@dataclass 58class OpenAiDefinition: 59 module: str 60 object: str 61 method: str 62 type: str 63 sync: bool 64 min_version: Optional[str] = None 65 max_version: Optional[str] = None 66 67 68OPENAI_METHODS_V0 = [ 69 OpenAiDefinition( 70 module="openai", 71 object="ChatCompletion", 72 method="create", 73 type="chat", 74 sync=True, 75 ), 76 OpenAiDefinition( 77 module="openai", 78 object="Completion", 79 method="create", 80 type="completion", 81 sync=True, 82 ), 83] 84 85 86OPENAI_METHODS_V1 = [ 87 OpenAiDefinition( 88 module="openai.resources.chat.completions", 89 object="Completions", 90 method="create", 91 type="chat", 92 sync=True, 93 ), 94 OpenAiDefinition( 95 module="openai.resources.completions", 96 object="Completions", 97 method="create", 98 type="completion", 99 sync=True, 100 ), 101 OpenAiDefinition( 102 module="openai.resources.chat.completions", 103 object="AsyncCompletions", 104 method="create", 105 type="chat", 106 sync=False, 107 ), 108 OpenAiDefinition( 109 module="openai.resources.completions", 110 object="AsyncCompletions", 111 method="create", 112 type="completion", 113 sync=False, 114 ), 115 OpenAiDefinition( 116 module="openai.resources.beta.chat.completions", 117 object="Completions", 118 method="parse", 119 type="chat", 120 sync=True, 121 min_version="1.50.0", 122 max_version="1.92.0", 123 ), 124 OpenAiDefinition( 125 module="openai.resources.beta.chat.completions", 126 object="AsyncCompletions", 127 method="parse", 128 type="chat", 129 sync=False, 130 min_version="1.50.0", 131 max_version="1.92.0", 132 ), 133 OpenAiDefinition( 134 module="openai.resources.chat.completions", 135 object="Completions", 136 method="parse", 137 type="chat", 138 sync=True, 139 min_version="1.92.0", 140 ), 141 OpenAiDefinition( 142 module="openai.resources.chat.completions", 143 object="AsyncCompletions", 144 method="parse", 145 type="chat", 146 sync=False, 147 min_version="1.92.0", 148 ), 149 OpenAiDefinition( 150 module="openai.resources.responses", 151 object="Responses", 152 method="create", 153 type="chat", 154 sync=True, 155 min_version="1.66.0", 156 ), 157 OpenAiDefinition( 158 module="openai.resources.responses", 159 object="AsyncResponses", 160 method="create", 161 type="chat", 162 sync=False, 163 min_version="1.66.0", 164 ), 165] 166 167 168class OpenAiArgsExtractor: 169 def __init__( 170 self, 171 name=None, 172 metadata=None, 173 trace_id=None, 174 session_id=None, 175 user_id=None, 176 tags=None, 177 parent_observation_id=None, 178 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 179 **kwargs, 180 ): 181 self.args = {} 182 self.args["name"] = name 183 self.args["metadata"] = ( 184 metadata 185 if "response_format" not in kwargs 186 else { 187 **(metadata or {}), 188 "response_format": kwargs["response_format"].model_json_schema() 189 if isclass(kwargs["response_format"]) 190 and issubclass(kwargs["response_format"], BaseModel) 191 else kwargs["response_format"], 192 } 193 ) 194 self.args["trace_id"] = trace_id 195 self.args["session_id"] = session_id 196 self.args["user_id"] = user_id 197 self.args["tags"] = tags 198 self.args["parent_observation_id"] = parent_observation_id 199 self.args["langfuse_prompt"] = langfuse_prompt 200 self.kwargs = kwargs 201 202 def get_langfuse_args(self): 203 return {**self.args, **self.kwargs} 204 205 def get_openai_args(self): 206 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 207 # https://platform.openai.com/docs/guides/distillation 208 if self.kwargs.get("store", False): 209 self.kwargs["metadata"] = ( 210 {} if self.args.get("metadata", None) is None else self.args["metadata"] 211 ) 212 213 # OpenAI does not support non-string type values in metadata when using 214 # model distillation feature 215 self.kwargs["metadata"].pop("response_format", None) 216 217 return self.kwargs 218 219 220def _langfuse_wrapper(func): 221 def _with_langfuse(open_ai_definitions, initialize): 222 def wrapper(wrapped, instance, args, kwargs): 223 return func(open_ai_definitions, initialize, wrapped, args, kwargs) 224 225 return wrapper 226 227 return _with_langfuse 228 229 230def _extract_chat_prompt(kwargs: any): 231 """Extracts the user input from prompts. Returns an array of messages or dict with messages and functions""" 232 prompt = {} 233 234 if kwargs.get("functions") is not None: 235 prompt.update({"functions": kwargs["functions"]}) 236 237 if kwargs.get("function_call") is not None: 238 prompt.update({"function_call": kwargs["function_call"]}) 239 240 if kwargs.get("tools") is not None: 241 prompt.update({"tools": kwargs["tools"]}) 242 243 if prompt: 244 # uf user provided functions, we need to send these together with messages to langfuse 245 prompt.update( 246 { 247 "messages": [ 248 _process_message(message) for message in kwargs.get("messages", []) 249 ], 250 } 251 ) 252 return prompt 253 else: 254 # vanilla case, only send messages in openai format to langfuse 255 return [_process_message(message) for message in kwargs.get("messages", [])] 256 257 258def _process_message(message): 259 if not isinstance(message, dict): 260 return message 261 262 processed_message = {**message} 263 264 content = processed_message.get("content", None) 265 if not isinstance(content, list): 266 return processed_message 267 268 processed_content = [] 269 270 for content_part in content: 271 if content_part.get("type") == "input_audio": 272 audio_base64 = content_part.get("input_audio", {}).get("data", None) 273 format = content_part.get("input_audio", {}).get("format", "wav") 274 275 if audio_base64 is not None: 276 base64_data_uri = f"data:audio/{format};base64,{audio_base64}" 277 278 processed_content.append( 279 { 280 "type": "input_audio", 281 "input_audio": { 282 "data": LangfuseMedia(base64_data_uri=base64_data_uri), 283 "format": format, 284 }, 285 } 286 ) 287 else: 288 processed_content.append(content_part) 289 290 processed_message["content"] = processed_content 291 292 return processed_message 293 294 295def _extract_chat_response(kwargs: any): 296 """Extracts the llm output from the response.""" 297 response = { 298 "role": kwargs.get("role", None), 299 } 300 301 audio = None 302 303 if kwargs.get("function_call") is not None: 304 response.update({"function_call": kwargs["function_call"]}) 305 306 if kwargs.get("tool_calls") is not None: 307 response.update({"tool_calls": kwargs["tool_calls"]}) 308 309 if kwargs.get("audio") is not None: 310 audio = kwargs["audio"].__dict__ 311 312 if "data" in audio and audio["data"] is not None: 313 base64_data_uri = f"data:audio/{audio.get('format', 'wav')};base64,{audio.get('data', None)}" 314 audio["data"] = LangfuseMedia(base64_data_uri=base64_data_uri) 315 316 response.update( 317 { 318 "content": kwargs.get("content", None), 319 } 320 ) 321 322 if audio is not None: 323 response.update({"audio": audio}) 324 325 return response 326 327 328def _get_langfuse_data_from_kwargs( 329 resource: OpenAiDefinition, langfuse: Langfuse, start_time, kwargs 330): 331 name = kwargs.get("name", "OpenAI-generation") 332 333 if name is None: 334 name = "OpenAI-generation" 335 336 if name is not None and not isinstance(name, str): 337 raise TypeError("name must be a string") 338 339 decorator_context_observation_id = langfuse_context.get_current_observation_id() 340 decorator_context_trace_id = langfuse_context.get_current_trace_id() 341 342 trace_id = kwargs.get("trace_id", None) or decorator_context_trace_id 343 if trace_id is not None and not isinstance(trace_id, str): 344 raise TypeError("trace_id must be a string") 345 346 session_id = kwargs.get("session_id", None) 347 if session_id is not None and not isinstance(session_id, str): 348 raise TypeError("session_id must be a string") 349 350 user_id = kwargs.get("user_id", None) 351 if user_id is not None and not isinstance(user_id, str): 352 raise TypeError("user_id must be a string") 353 354 tags = kwargs.get("tags", None) 355 if tags is not None and ( 356 not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags) 357 ): 358 raise TypeError("tags must be a list of strings") 359 360 # Update trace params in decorator context if specified in openai call 361 if decorator_context_trace_id: 362 langfuse_context.update_current_trace( 363 session_id=session_id, user_id=user_id, tags=tags 364 ) 365 366 parent_observation_id = kwargs.get("parent_observation_id", None) or ( 367 decorator_context_observation_id 368 if decorator_context_observation_id != decorator_context_trace_id 369 else None 370 ) 371 if parent_observation_id is not None and not isinstance(parent_observation_id, str): 372 raise TypeError("parent_observation_id must be a string") 373 if parent_observation_id is not None and trace_id is None: 374 raise ValueError("parent_observation_id requires trace_id to be set") 375 376 metadata = kwargs.get("metadata", {}) 377 378 if metadata is not None and not isinstance(metadata, dict): 379 raise TypeError("metadata must be a dictionary") 380 381 model = kwargs.get("model", None) or None 382 383 prompt = None 384 385 if resource.type == "completion": 386 prompt = kwargs.get("prompt", None) 387 388 elif resource.object == "Responses": 389 prompt = kwargs.get("input", None) 390 391 elif resource.type == "chat": 392 prompt = _extract_chat_prompt(kwargs) 393 394 is_nested_trace = False 395 if trace_id: 396 is_nested_trace = True 397 langfuse.trace(id=trace_id, session_id=session_id, user_id=user_id, tags=tags) 398 else: 399 trace_id = ( 400 decorator_context_trace_id 401 or langfuse.trace( 402 session_id=session_id, 403 user_id=user_id, 404 tags=tags, 405 name=name, 406 input=prompt, 407 metadata=metadata, 408 ).id 409 ) 410 411 parsed_temperature = ( 412 kwargs.get("temperature", 1) 413 if not isinstance(kwargs.get("temperature", 1), NotGiven) 414 else 1 415 ) 416 417 parsed_max_tokens = ( 418 kwargs.get("max_tokens", float("inf")) 419 if not isinstance(kwargs.get("max_tokens", float("inf")), NotGiven) 420 else float("inf") 421 ) 422 423 parsed_top_p = ( 424 kwargs.get("top_p", 1) 425 if not isinstance(kwargs.get("top_p", 1), NotGiven) 426 else 1 427 ) 428 429 parsed_frequency_penalty = ( 430 kwargs.get("frequency_penalty", 0) 431 if not isinstance(kwargs.get("frequency_penalty", 0), NotGiven) 432 else 0 433 ) 434 435 parsed_presence_penalty = ( 436 kwargs.get("presence_penalty", 0) 437 if not isinstance(kwargs.get("presence_penalty", 0), NotGiven) 438 else 0 439 ) 440 441 parsed_seed = ( 442 kwargs.get("seed", None) 443 if not isinstance(kwargs.get("seed", None), NotGiven) 444 else None 445 ) 446 447 parsed_n = kwargs.get("n", 1) if not isinstance(kwargs.get("n", 1), NotGiven) else 1 448 449 modelParameters = { 450 "temperature": parsed_temperature, 451 "max_tokens": parsed_max_tokens, # casing? 452 "top_p": parsed_top_p, 453 "frequency_penalty": parsed_frequency_penalty, 454 "presence_penalty": parsed_presence_penalty, 455 } 456 if parsed_n is not None and parsed_n > 1: 457 modelParameters["n"] = parsed_n 458 459 if parsed_seed is not None: 460 modelParameters["seed"] = parsed_seed 461 462 langfuse_prompt = kwargs.get("langfuse_prompt", None) 463 464 return { 465 "name": name, 466 "metadata": metadata, 467 "trace_id": trace_id, 468 "parent_observation_id": parent_observation_id, 469 "user_id": user_id, 470 "start_time": start_time, 471 "input": prompt, 472 "model_parameters": modelParameters, 473 "model": model or None, 474 "prompt": langfuse_prompt, 475 }, is_nested_trace 476 477 478def _create_langfuse_update( 479 completion, 480 generation: StatefulGenerationClient, 481 completion_start_time, 482 model=None, 483 usage=None, 484 metadata=None, 485): 486 update = { 487 "end_time": _get_timestamp(), 488 "output": completion, 489 "completion_start_time": completion_start_time, 490 } 491 if model is not None: 492 update["model"] = model 493 494 if metadata is not None: 495 update["metadata"] = metadata 496 497 if usage is not None: 498 parsed_usage = _parse_usage(usage) 499 500 update["usage"] = parsed_usage 501 update["usage_details"] = parsed_usage 502 503 generation.update(**update) 504 505 506def _parse_usage(usage=None): 507 if usage is None: 508 return 509 510 usage_dict = usage.copy() if isinstance(usage, dict) else usage.__dict__.copy() 511 512 for tokens_details in [ 513 "prompt_tokens_details", 514 "completion_tokens_details", 515 "input_token_details", 516 "output_token_details", 517 ]: 518 if tokens_details in usage_dict and usage_dict[tokens_details] is not None: 519 tokens_details_dict = ( 520 usage_dict[tokens_details] 521 if isinstance(usage_dict[tokens_details], dict) 522 else usage_dict[tokens_details].__dict__ 523 ) 524 usage_dict[tokens_details] = { 525 k: v for k, v in tokens_details_dict.items() if v is not None 526 } 527 528 return usage_dict 529 530 531def _extract_streamed_response_api_response(chunks): 532 completion, model, usage = None, None, None 533 metadata = {} 534 535 for raw_chunk in chunks: 536 chunk = raw_chunk.__dict__ 537 if raw_response := chunk.get("response", None): 538 usage = chunk.get("usage", None) 539 response = raw_response.__dict__ 540 model = response.get("model") 541 542 for key, val in response.items(): 543 if key not in ["created_at", "model", "output", "usage", "text"]: 544 metadata[key] = val 545 546 if key == "output": 547 output = val 548 if not isinstance(output, list): 549 completion = output 550 elif len(output) > 1: 551 completion = output 552 elif len(output) == 1: 553 completion = output[0] 554 555 return (model, completion, usage, metadata) 556 557 558def _extract_streamed_openai_response(resource, chunks): 559 completion = defaultdict(str) if resource.type == "chat" else "" 560 model, usage = None, None 561 562 for chunk in chunks: 563 if _is_openai_v1(): 564 chunk = chunk.__dict__ 565 566 model = model or chunk.get("model", None) or None 567 usage = chunk.get("usage", None) 568 569 choices = chunk.get("choices", []) 570 571 for choice in choices: 572 if _is_openai_v1(): 573 choice = choice.__dict__ 574 if resource.type == "chat": 575 delta = choice.get("delta", None) 576 577 if _is_openai_v1(): 578 delta = delta.__dict__ 579 580 if delta.get("role", None) is not None: 581 completion["role"] = delta["role"] 582 583 if delta.get("content", None) is not None: 584 completion["content"] = ( 585 delta.get("content", None) 586 if completion["content"] is None 587 else completion["content"] + delta.get("content", None) 588 ) 589 elif delta.get("function_call", None) is not None: 590 curr = completion["function_call"] 591 tool_call_chunk = delta.get("function_call", None) 592 593 if not curr: 594 completion["function_call"] = { 595 "name": getattr(tool_call_chunk, "name", ""), 596 "arguments": getattr(tool_call_chunk, "arguments", ""), 597 } 598 599 else: 600 curr["name"] = curr["name"] or getattr( 601 tool_call_chunk, "name", None 602 ) 603 curr["arguments"] += getattr(tool_call_chunk, "arguments", "") 604 605 elif delta.get("tool_calls", None) is not None: 606 curr = completion["tool_calls"] 607 tool_call_chunk = getattr( 608 delta.get("tool_calls", None)[0], "function", None 609 ) 610 611 if not curr: 612 completion["tool_calls"] = [ 613 { 614 "name": getattr(tool_call_chunk, "name", ""), 615 "arguments": getattr(tool_call_chunk, "arguments", ""), 616 } 617 ] 618 619 elif getattr(tool_call_chunk, "name", None) is not None: 620 curr.append( 621 { 622 "name": getattr(tool_call_chunk, "name", None), 623 "arguments": getattr( 624 tool_call_chunk, "arguments", None 625 ), 626 } 627 ) 628 629 else: 630 curr[-1]["name"] = curr[-1]["name"] or getattr( 631 tool_call_chunk, "name", None 632 ) 633 curr[-1]["arguments"] += getattr( 634 tool_call_chunk, "arguments", None 635 ) 636 637 if resource.type == "completion": 638 completion += choice.get("text", "") 639 640 def get_response_for_chat(): 641 return ( 642 completion["content"] 643 or ( 644 completion["function_call"] 645 and { 646 "role": "assistant", 647 "function_call": completion["function_call"], 648 } 649 ) 650 or ( 651 completion["tool_calls"] 652 and { 653 "role": "assistant", 654 # "tool_calls": [{"function": completion["tool_calls"]}], 655 "tool_calls": [ 656 {"function": data} for data in completion["tool_calls"] 657 ], 658 } 659 ) 660 or None 661 ) 662 663 return ( 664 model, 665 get_response_for_chat() if resource.type == "chat" else completion, 666 usage, 667 None, 668 ) 669 670 671def _get_langfuse_data_from_default_response(resource: OpenAiDefinition, response): 672 if response is None: 673 return None, "<NoneType response returned from OpenAI>", None 674 675 model = response.get("model", None) or None 676 677 completion = None 678 679 if resource.type == "completion": 680 choices = response.get("choices", []) 681 if len(choices) > 0: 682 choice = choices[-1] 683 684 completion = choice.text if _is_openai_v1() else choice.get("text", None) 685 686 elif resource.object == "Responses": 687 output = response.get("output", {}) 688 689 if not isinstance(output, list): 690 completion = output 691 elif len(output) > 1: 692 completion = output 693 elif len(output) == 1: 694 completion = output[0] 695 696 elif resource.type == "chat": 697 choices = response.get("choices", []) 698 if len(choices) > 0: 699 # If multiple choices were generated, we'll show all of them in the UI as a list. 700 if len(choices) > 1: 701 completion = [ 702 _extract_chat_response(choice.message.__dict__) 703 if _is_openai_v1() 704 else choice.get("message", None) 705 for choice in choices 706 ] 707 else: 708 choice = choices[0] 709 completion = ( 710 _extract_chat_response(choice.message.__dict__) 711 if _is_openai_v1() 712 else choice.get("message", None) 713 ) 714 715 usage = _parse_usage(response.get("usage", None)) 716 717 return (model, completion, usage) 718 719 720def _is_openai_v1(): 721 return Version(openai.__version__) >= Version("1.0.0") 722 723 724def _is_streaming_response(response): 725 return ( 726 isinstance(response, types.GeneratorType) 727 or isinstance(response, types.AsyncGeneratorType) 728 or (_is_openai_v1() and isinstance(response, openai.Stream)) 729 or (_is_openai_v1() and isinstance(response, openai.AsyncStream)) 730 ) 731 732 733@_langfuse_wrapper 734def _wrap(open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs): 735 new_langfuse: Langfuse = initialize() 736 737 start_time = _get_timestamp() 738 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 739 740 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 741 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 742 ) 743 generation = new_langfuse.generation(**generation) 744 try: 745 openai_response = wrapped(**arg_extractor.get_openai_args()) 746 747 if _is_streaming_response(openai_response): 748 return LangfuseResponseGeneratorSync( 749 resource=open_ai_resource, 750 response=openai_response, 751 generation=generation, 752 langfuse=new_langfuse, 753 is_nested_trace=is_nested_trace, 754 ) 755 756 else: 757 model, completion, usage = _get_langfuse_data_from_default_response( 758 open_ai_resource, 759 (openai_response and openai_response.__dict__) 760 if _is_openai_v1() 761 else openai_response, 762 ) 763 generation.update( 764 model=model, 765 output=completion, 766 end_time=_get_timestamp(), 767 usage=usage, # backward compat for all V2 self hosters 768 usage_details=usage, 769 ) 770 771 # Avoiding the trace-update if trace-id is provided by user. 772 if not is_nested_trace: 773 new_langfuse.trace(id=generation.trace_id, output=completion) 774 775 return openai_response 776 except Exception as ex: 777 log.warning(ex) 778 model = kwargs.get("model", None) or None 779 generation.update( 780 end_time=_get_timestamp(), 781 status_message=str(ex), 782 level="ERROR", 783 model=model, 784 usage={ 785 "input_cost": 0, 786 "output_cost": 0, 787 "total_cost": 0, 788 }, # backward compat for all V2 self hosters 789 cost_details={"input": 0, "output": 0, "total": 0}, 790 ) 791 raise ex 792 793 794@_langfuse_wrapper 795async def _wrap_async( 796 open_ai_resource: OpenAiDefinition, initialize, wrapped, args, kwargs 797): 798 new_langfuse = initialize() 799 start_time = _get_timestamp() 800 arg_extractor = OpenAiArgsExtractor(*args, **kwargs) 801 802 generation, is_nested_trace = _get_langfuse_data_from_kwargs( 803 open_ai_resource, new_langfuse, start_time, arg_extractor.get_langfuse_args() 804 ) 805 generation = new_langfuse.generation(**generation) 806 try: 807 openai_response = await wrapped(**arg_extractor.get_openai_args()) 808 809 if _is_streaming_response(openai_response): 810 return LangfuseResponseGeneratorAsync( 811 resource=open_ai_resource, 812 response=openai_response, 813 generation=generation, 814 langfuse=new_langfuse, 815 is_nested_trace=is_nested_trace, 816 ) 817 818 else: 819 model, completion, usage = _get_langfuse_data_from_default_response( 820 open_ai_resource, 821 (openai_response and openai_response.__dict__) 822 if _is_openai_v1() 823 else openai_response, 824 ) 825 generation.update( 826 model=model, 827 output=completion, 828 end_time=_get_timestamp(), 829 usage=usage, # backward compat for all V2 self hosters 830 usage_details=usage, 831 ) 832 # Avoiding the trace-update if trace-id is provided by user. 833 if not is_nested_trace: 834 new_langfuse.trace(id=generation.trace_id, output=completion) 835 836 return openai_response 837 except Exception as ex: 838 model = kwargs.get("model", None) or None 839 generation.update( 840 end_time=_get_timestamp(), 841 status_message=str(ex), 842 level="ERROR", 843 model=model, 844 usage={ 845 "input_cost": 0, 846 "output_cost": 0, 847 "total_cost": 0, 848 }, # Backward compat for all V2 self hosters 849 cost_details={"input": 0, "output": 0, "total": 0}, 850 ) 851 raise ex 852 853 854class OpenAILangfuse: 855 _langfuse: Optional[Langfuse] = None 856 857 def initialize(self): 858 self._langfuse = LangfuseSingleton().get( 859 public_key=openai.langfuse_public_key, 860 secret_key=openai.langfuse_secret_key, 861 host=openai.langfuse_host, 862 debug=openai.langfuse_debug, 863 enabled=openai.langfuse_enabled, 864 sdk_integration="openai", 865 sample_rate=openai.langfuse_sample_rate, 866 environment=openai.langfuse_environment, 867 mask=openai.langfuse_mask, 868 ) 869 870 return self._langfuse 871 872 def flush(cls): 873 cls._langfuse.flush() 874 875 def langfuse_auth_check(self): 876 """Check if the provided Langfuse credentials (public and secret key) are valid. 877 878 Raises: 879 Exception: If no projects were found for the provided credentials. 880 881 Note: 882 This method is blocking. It is discouraged to use it in production code. 883 """ 884 if self._langfuse is None: 885 self.initialize() 886 887 return self._langfuse.auth_check() 888 889 def register_tracing(self): 890 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 891 892 for resource in resources: 893 if resource.min_version is not None and Version( 894 openai.__version__ 895 ) < Version(resource.min_version): 896 continue 897 898 if resource.max_version is not None and Version( 899 openai.__version__ 900 ) >= Version(resource.max_version): 901 continue 902 903 wrap_function_wrapper( 904 resource.module, 905 f"{resource.object}.{resource.method}", 906 _wrap(resource, self.initialize) 907 if resource.sync 908 else _wrap_async(resource, self.initialize), 909 ) 910 911 setattr(openai, "langfuse_public_key", None) 912 setattr(openai, "langfuse_secret_key", None) 913 setattr(openai, "langfuse_host", None) 914 setattr(openai, "langfuse_debug", None) 915 setattr(openai, "langfuse_enabled", True) 916 setattr(openai, "langfuse_sample_rate", None) 917 setattr(openai, "langfuse_environment", None) 918 setattr(openai, "langfuse_mask", None) 919 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 920 setattr(openai, "flush_langfuse", self.flush) 921 922 923modifier = OpenAILangfuse() 924modifier.register_tracing() 925 926 927# DEPRECATED: Use `openai.langfuse_auth_check()` instead 928def auth_check(): 929 if modifier._langfuse is None: 930 modifier.initialize() 931 932 return modifier._langfuse.auth_check() 933 934 935class LangfuseResponseGeneratorSync: 936 def __init__( 937 self, 938 *, 939 resource, 940 response, 941 generation, 942 langfuse, 943 is_nested_trace, 944 ): 945 self.items = [] 946 947 self.resource = resource 948 self.response = response 949 self.generation = generation 950 self.langfuse = langfuse 951 self.is_nested_trace = is_nested_trace 952 self.completion_start_time = None 953 954 def __iter__(self): 955 try: 956 for i in self.response: 957 self.items.append(i) 958 959 if self.completion_start_time is None: 960 self.completion_start_time = _get_timestamp() 961 962 yield i 963 finally: 964 self._finalize() 965 966 def __next__(self): 967 try: 968 item = self.response.__next__() 969 self.items.append(item) 970 971 if self.completion_start_time is None: 972 self.completion_start_time = _get_timestamp() 973 974 return item 975 976 except StopIteration: 977 self._finalize() 978 979 raise 980 981 def __enter__(self): 982 return self.__iter__() 983 984 def __exit__(self, exc_type, exc_value, traceback): 985 pass 986 987 def _finalize(self): 988 model, completion, usage, metadata = ( 989 _extract_streamed_response_api_response(self.items) 990 if self.resource.object == "Responses" 991 else _extract_streamed_openai_response(self.resource, self.items) 992 ) 993 994 # Avoiding the trace-update if trace-id is provided by user. 995 if not self.is_nested_trace: 996 self.langfuse.trace(id=self.generation.trace_id, output=completion) 997 998 _create_langfuse_update( 999 completion, 1000 self.generation, 1001 self.completion_start_time, 1002 model=model, 1003 usage=usage, 1004 metadata=metadata, 1005 ) 1006 1007 1008class LangfuseResponseGeneratorAsync: 1009 def __init__( 1010 self, 1011 *, 1012 resource, 1013 response, 1014 generation, 1015 langfuse, 1016 is_nested_trace, 1017 ): 1018 self.items = [] 1019 1020 self.resource = resource 1021 self.response = response 1022 self.generation = generation 1023 self.langfuse = langfuse 1024 self.is_nested_trace = is_nested_trace 1025 self.completion_start_time = None 1026 1027 async def __aiter__(self): 1028 try: 1029 async for i in self.response: 1030 self.items.append(i) 1031 1032 if self.completion_start_time is None: 1033 self.completion_start_time = _get_timestamp() 1034 1035 yield i 1036 finally: 1037 await self._finalize() 1038 1039 async def __anext__(self): 1040 try: 1041 item = await self.response.__anext__() 1042 self.items.append(item) 1043 1044 if self.completion_start_time is None: 1045 self.completion_start_time = _get_timestamp() 1046 1047 return item 1048 1049 except StopAsyncIteration: 1050 await self._finalize() 1051 1052 raise 1053 1054 async def __aenter__(self): 1055 return self.__aiter__() 1056 1057 async def __aexit__(self, exc_type, exc_value, traceback): 1058 pass 1059 1060 async def _finalize(self): 1061 model, completion, usage, metadata = ( 1062 _extract_streamed_response_api_response(self.items) 1063 if self.resource.object == "Responses" 1064 else _extract_streamed_openai_response(self.resource, self.items) 1065 ) 1066 1067 # Avoiding the trace-update if trace-id is provided by user. 1068 if not self.is_nested_trace: 1069 self.langfuse.trace(id=self.generation.trace_id, output=completion) 1070 1071 _create_langfuse_update( 1072 completion, 1073 self.generation, 1074 self.completion_start_time, 1075 model=model, 1076 usage=usage, 1077 metadata=metadata, 1078 ) 1079 1080 async def close(self) -> None: 1081 """Close the response and release the connection. 1082 1083 Automatically called if the response body is read to completion. 1084 """ 1085 await self.response.close() 1086 1087 async def aclose(self) -> None: 1088 """Close the response and release the connection. 1089 1090 Automatically called if the response body is read to completion. 1091 """ 1092 await self.response.aclose()
log =
<Logger langfuse (WARNING)>
@dataclass
class
OpenAiDefinition:
58@dataclass 59class OpenAiDefinition: 60 module: str 61 object: str 62 method: str 63 type: str 64 sync: bool 65 min_version: Optional[str] = None 66 max_version: Optional[str] = None
OPENAI_METHODS_V0 =
[OpenAiDefinition(module='openai', object='ChatCompletion', method='create', type='chat', sync=True, min_version=None, max_version=None), OpenAiDefinition(module='openai', object='Completion', method='create', type='completion', sync=True, min_version=None, max_version=None)]
OPENAI_METHODS_V1 =
[OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='create', type='chat', sync=True, min_version=None, max_version=None), OpenAiDefinition(module='openai.resources.completions', object='Completions', method='create', type='completion', sync=True, min_version=None, max_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='create', type='chat', sync=False, min_version=None, max_version=None), OpenAiDefinition(module='openai.resources.completions', object='AsyncCompletions', method='create', type='completion', sync=False, min_version=None, max_version=None), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.50.0', max_version='1.92.0'), OpenAiDefinition(module='openai.resources.beta.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.50.0', max_version='1.92.0'), OpenAiDefinition(module='openai.resources.chat.completions', object='Completions', method='parse', type='chat', sync=True, min_version='1.92.0', max_version=None), OpenAiDefinition(module='openai.resources.chat.completions', object='AsyncCompletions', method='parse', type='chat', sync=False, min_version='1.92.0', max_version=None), OpenAiDefinition(module='openai.resources.responses', object='Responses', method='create', type='chat', sync=True, min_version='1.66.0', max_version=None), OpenAiDefinition(module='openai.resources.responses', object='AsyncResponses', method='create', type='chat', sync=False, min_version='1.66.0', max_version=None)]
class
OpenAiArgsExtractor:
169class OpenAiArgsExtractor: 170 def __init__( 171 self, 172 name=None, 173 metadata=None, 174 trace_id=None, 175 session_id=None, 176 user_id=None, 177 tags=None, 178 parent_observation_id=None, 179 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 180 **kwargs, 181 ): 182 self.args = {} 183 self.args["name"] = name 184 self.args["metadata"] = ( 185 metadata 186 if "response_format" not in kwargs 187 else { 188 **(metadata or {}), 189 "response_format": kwargs["response_format"].model_json_schema() 190 if isclass(kwargs["response_format"]) 191 and issubclass(kwargs["response_format"], BaseModel) 192 else kwargs["response_format"], 193 } 194 ) 195 self.args["trace_id"] = trace_id 196 self.args["session_id"] = session_id 197 self.args["user_id"] = user_id 198 self.args["tags"] = tags 199 self.args["parent_observation_id"] = parent_observation_id 200 self.args["langfuse_prompt"] = langfuse_prompt 201 self.kwargs = kwargs 202 203 def get_langfuse_args(self): 204 return {**self.args, **self.kwargs} 205 206 def get_openai_args(self): 207 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 208 # https://platform.openai.com/docs/guides/distillation 209 if self.kwargs.get("store", False): 210 self.kwargs["metadata"] = ( 211 {} if self.args.get("metadata", None) is None else self.args["metadata"] 212 ) 213 214 # OpenAI does not support non-string type values in metadata when using 215 # model distillation feature 216 self.kwargs["metadata"].pop("response_format", None) 217 218 return self.kwargs
OpenAiArgsExtractor( name=None, metadata=None, trace_id=None, session_id=None, user_id=None, tags=None, parent_observation_id=None, langfuse_prompt=None, **kwargs)
170 def __init__( 171 self, 172 name=None, 173 metadata=None, 174 trace_id=None, 175 session_id=None, 176 user_id=None, 177 tags=None, 178 parent_observation_id=None, 179 langfuse_prompt=None, # we cannot use prompt because it's an argument of the old OpenAI completions API 180 **kwargs, 181 ): 182 self.args = {} 183 self.args["name"] = name 184 self.args["metadata"] = ( 185 metadata 186 if "response_format" not in kwargs 187 else { 188 **(metadata or {}), 189 "response_format": kwargs["response_format"].model_json_schema() 190 if isclass(kwargs["response_format"]) 191 and issubclass(kwargs["response_format"], BaseModel) 192 else kwargs["response_format"], 193 } 194 ) 195 self.args["trace_id"] = trace_id 196 self.args["session_id"] = session_id 197 self.args["user_id"] = user_id 198 self.args["tags"] = tags 199 self.args["parent_observation_id"] = parent_observation_id 200 self.args["langfuse_prompt"] = langfuse_prompt 201 self.kwargs = kwargs
def
get_openai_args(self):
206 def get_openai_args(self): 207 # If OpenAI model distillation is enabled, we need to add the metadata to the kwargs 208 # https://platform.openai.com/docs/guides/distillation 209 if self.kwargs.get("store", False): 210 self.kwargs["metadata"] = ( 211 {} if self.args.get("metadata", None) is None else self.args["metadata"] 212 ) 213 214 # OpenAI does not support non-string type values in metadata when using 215 # model distillation feature 216 self.kwargs["metadata"].pop("response_format", None) 217 218 return self.kwargs
class
OpenAILangfuse:
855class OpenAILangfuse: 856 _langfuse: Optional[Langfuse] = None 857 858 def initialize(self): 859 self._langfuse = LangfuseSingleton().get( 860 public_key=openai.langfuse_public_key, 861 secret_key=openai.langfuse_secret_key, 862 host=openai.langfuse_host, 863 debug=openai.langfuse_debug, 864 enabled=openai.langfuse_enabled, 865 sdk_integration="openai", 866 sample_rate=openai.langfuse_sample_rate, 867 environment=openai.langfuse_environment, 868 mask=openai.langfuse_mask, 869 ) 870 871 return self._langfuse 872 873 def flush(cls): 874 cls._langfuse.flush() 875 876 def langfuse_auth_check(self): 877 """Check if the provided Langfuse credentials (public and secret key) are valid. 878 879 Raises: 880 Exception: If no projects were found for the provided credentials. 881 882 Note: 883 This method is blocking. It is discouraged to use it in production code. 884 """ 885 if self._langfuse is None: 886 self.initialize() 887 888 return self._langfuse.auth_check() 889 890 def register_tracing(self): 891 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 892 893 for resource in resources: 894 if resource.min_version is not None and Version( 895 openai.__version__ 896 ) < Version(resource.min_version): 897 continue 898 899 if resource.max_version is not None and Version( 900 openai.__version__ 901 ) >= Version(resource.max_version): 902 continue 903 904 wrap_function_wrapper( 905 resource.module, 906 f"{resource.object}.{resource.method}", 907 _wrap(resource, self.initialize) 908 if resource.sync 909 else _wrap_async(resource, self.initialize), 910 ) 911 912 setattr(openai, "langfuse_public_key", None) 913 setattr(openai, "langfuse_secret_key", None) 914 setattr(openai, "langfuse_host", None) 915 setattr(openai, "langfuse_debug", None) 916 setattr(openai, "langfuse_enabled", True) 917 setattr(openai, "langfuse_sample_rate", None) 918 setattr(openai, "langfuse_environment", None) 919 setattr(openai, "langfuse_mask", None) 920 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 921 setattr(openai, "flush_langfuse", self.flush)
def
initialize(self):
858 def initialize(self): 859 self._langfuse = LangfuseSingleton().get( 860 public_key=openai.langfuse_public_key, 861 secret_key=openai.langfuse_secret_key, 862 host=openai.langfuse_host, 863 debug=openai.langfuse_debug, 864 enabled=openai.langfuse_enabled, 865 sdk_integration="openai", 866 sample_rate=openai.langfuse_sample_rate, 867 environment=openai.langfuse_environment, 868 mask=openai.langfuse_mask, 869 ) 870 871 return self._langfuse
def
langfuse_auth_check(self):
876 def langfuse_auth_check(self): 877 """Check if the provided Langfuse credentials (public and secret key) are valid. 878 879 Raises: 880 Exception: If no projects were found for the provided credentials. 881 882 Note: 883 This method is blocking. It is discouraged to use it in production code. 884 """ 885 if self._langfuse is None: 886 self.initialize() 887 888 return self._langfuse.auth_check()
Check if the provided Langfuse credentials (public and secret key) are valid.
Raises:
- Exception: If no projects were found for the provided credentials.
Note:
This method is blocking. It is discouraged to use it in production code.
def
register_tracing(self):
890 def register_tracing(self): 891 resources = OPENAI_METHODS_V1 if _is_openai_v1() else OPENAI_METHODS_V0 892 893 for resource in resources: 894 if resource.min_version is not None and Version( 895 openai.__version__ 896 ) < Version(resource.min_version): 897 continue 898 899 if resource.max_version is not None and Version( 900 openai.__version__ 901 ) >= Version(resource.max_version): 902 continue 903 904 wrap_function_wrapper( 905 resource.module, 906 f"{resource.object}.{resource.method}", 907 _wrap(resource, self.initialize) 908 if resource.sync 909 else _wrap_async(resource, self.initialize), 910 ) 911 912 setattr(openai, "langfuse_public_key", None) 913 setattr(openai, "langfuse_secret_key", None) 914 setattr(openai, "langfuse_host", None) 915 setattr(openai, "langfuse_debug", None) 916 setattr(openai, "langfuse_enabled", True) 917 setattr(openai, "langfuse_sample_rate", None) 918 setattr(openai, "langfuse_environment", None) 919 setattr(openai, "langfuse_mask", None) 920 setattr(openai, "langfuse_auth_check", self.langfuse_auth_check) 921 setattr(openai, "flush_langfuse", self.flush)
modifier =
<OpenAILangfuse object>
def
auth_check():
class
LangfuseResponseGeneratorSync:
936class LangfuseResponseGeneratorSync: 937 def __init__( 938 self, 939 *, 940 resource, 941 response, 942 generation, 943 langfuse, 944 is_nested_trace, 945 ): 946 self.items = [] 947 948 self.resource = resource 949 self.response = response 950 self.generation = generation 951 self.langfuse = langfuse 952 self.is_nested_trace = is_nested_trace 953 self.completion_start_time = None 954 955 def __iter__(self): 956 try: 957 for i in self.response: 958 self.items.append(i) 959 960 if self.completion_start_time is None: 961 self.completion_start_time = _get_timestamp() 962 963 yield i 964 finally: 965 self._finalize() 966 967 def __next__(self): 968 try: 969 item = self.response.__next__() 970 self.items.append(item) 971 972 if self.completion_start_time is None: 973 self.completion_start_time = _get_timestamp() 974 975 return item 976 977 except StopIteration: 978 self._finalize() 979 980 raise 981 982 def __enter__(self): 983 return self.__iter__() 984 985 def __exit__(self, exc_type, exc_value, traceback): 986 pass 987 988 def _finalize(self): 989 model, completion, usage, metadata = ( 990 _extract_streamed_response_api_response(self.items) 991 if self.resource.object == "Responses" 992 else _extract_streamed_openai_response(self.resource, self.items) 993 ) 994 995 # Avoiding the trace-update if trace-id is provided by user. 996 if not self.is_nested_trace: 997 self.langfuse.trace(id=self.generation.trace_id, output=completion) 998 999 _create_langfuse_update( 1000 completion, 1001 self.generation, 1002 self.completion_start_time, 1003 model=model, 1004 usage=usage, 1005 metadata=metadata, 1006 )
LangfuseResponseGeneratorSync(*, resource, response, generation, langfuse, is_nested_trace)
937 def __init__( 938 self, 939 *, 940 resource, 941 response, 942 generation, 943 langfuse, 944 is_nested_trace, 945 ): 946 self.items = [] 947 948 self.resource = resource 949 self.response = response 950 self.generation = generation 951 self.langfuse = langfuse 952 self.is_nested_trace = is_nested_trace 953 self.completion_start_time = None
class
LangfuseResponseGeneratorAsync:
1009class LangfuseResponseGeneratorAsync: 1010 def __init__( 1011 self, 1012 *, 1013 resource, 1014 response, 1015 generation, 1016 langfuse, 1017 is_nested_trace, 1018 ): 1019 self.items = [] 1020 1021 self.resource = resource 1022 self.response = response 1023 self.generation = generation 1024 self.langfuse = langfuse 1025 self.is_nested_trace = is_nested_trace 1026 self.completion_start_time = None 1027 1028 async def __aiter__(self): 1029 try: 1030 async for i in self.response: 1031 self.items.append(i) 1032 1033 if self.completion_start_time is None: 1034 self.completion_start_time = _get_timestamp() 1035 1036 yield i 1037 finally: 1038 await self._finalize() 1039 1040 async def __anext__(self): 1041 try: 1042 item = await self.response.__anext__() 1043 self.items.append(item) 1044 1045 if self.completion_start_time is None: 1046 self.completion_start_time = _get_timestamp() 1047 1048 return item 1049 1050 except StopAsyncIteration: 1051 await self._finalize() 1052 1053 raise 1054 1055 async def __aenter__(self): 1056 return self.__aiter__() 1057 1058 async def __aexit__(self, exc_type, exc_value, traceback): 1059 pass 1060 1061 async def _finalize(self): 1062 model, completion, usage, metadata = ( 1063 _extract_streamed_response_api_response(self.items) 1064 if self.resource.object == "Responses" 1065 else _extract_streamed_openai_response(self.resource, self.items) 1066 ) 1067 1068 # Avoiding the trace-update if trace-id is provided by user. 1069 if not self.is_nested_trace: 1070 self.langfuse.trace(id=self.generation.trace_id, output=completion) 1071 1072 _create_langfuse_update( 1073 completion, 1074 self.generation, 1075 self.completion_start_time, 1076 model=model, 1077 usage=usage, 1078 metadata=metadata, 1079 ) 1080 1081 async def close(self) -> None: 1082 """Close the response and release the connection. 1083 1084 Automatically called if the response body is read to completion. 1085 """ 1086 await self.response.close() 1087 1088 async def aclose(self) -> None: 1089 """Close the response and release the connection. 1090 1091 Automatically called if the response body is read to completion. 1092 """ 1093 await self.response.aclose()
LangfuseResponseGeneratorAsync(*, resource, response, generation, langfuse, is_nested_trace)
1010 def __init__( 1011 self, 1012 *, 1013 resource, 1014 response, 1015 generation, 1016 langfuse, 1017 is_nested_trace, 1018 ): 1019 self.items = [] 1020 1021 self.resource = resource 1022 self.response = response 1023 self.generation = generation 1024 self.langfuse = langfuse 1025 self.is_nested_trace = is_nested_trace 1026 self.completion_start_time = None
async def
close(self) -> None:
1081 async def close(self) -> None: 1082 """Close the response and release the connection. 1083 1084 Automatically called if the response body is read to completion. 1085 """ 1086 await self.response.close()
Close the response and release the connection.
Automatically called if the response body is read to completion.
async def
aclose(self) -> None:
1088 async def aclose(self) -> None: 1089 """Close the response and release the connection. 1090 1091 Automatically called if the response body is read to completion. 1092 """ 1093 await self.response.aclose()
Close the response and release the connection.
Automatically called if the response body is read to completion.