Skip to main content

OpenAI Provider

The OpenAI Provider implements the BaseModelProvider interface for OpenAI API and any OpenAI-compatible endpoints (e.g., Azure OpenAI, local models, third-party services).

Installation

pip install ag_kit_py
The OpenAI provider uses langchain-openai for LangChain integration, which is installed automatically.

Quick Start

Basic Usage

from ag_kit_py.providers import OpenAIProvider, ModelProviderConfig

# Create configuration
config = ModelProviderConfig(
    api_key="sk-...",
    default_model="gpt-4o-mini",
    temperature=0.7
)

# Create provider
provider = OpenAIProvider(config)

# Get LangChain model
model = provider.get_langchain_model()

Using Factory Function

from ag_kit_py.providers import create_openai_provider

provider = create_openai_provider(
    api_key="sk-...",
    model="gpt-4o-mini",
    temperature=0.7
)

Using Environment Variables

import os
from ag_kit_py.providers import create_provider

# Set environment variables
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["OPENAI_MODEL"] = "gpt-4o-mini"

# Create provider (automatically loads from environment)
provider = create_provider("openai")

Configuration

ModelProviderConfig

from ag_kit_py.providers import ModelProviderConfig

config = ModelProviderConfig(
    api_key="sk-...",                    # Required: OpenAI API key
    base_url=None,                       # Optional: Custom endpoint
    default_model="gpt-4o-mini",         # Optional: Default model
    temperature=0.7,                     # Optional: Sampling temperature
    timeout=30000,                       # Optional: Request timeout (ms)
    max_retries=3,                       # Optional: Max retry attempts
    retry_delay=1000,                    # Optional: Retry delay (ms)
    organization=None,                   # Optional: Organization ID
    project=None                         # Optional: Project ID
)

provider = OpenAIProvider(config)

Environment Variables

The OpenAI provider supports these environment variables:
# Required
OPENAI_API_KEY=sk-...

# Optional
OPENAI_MODEL=gpt-4o-mini
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_TEMPERATURE=0.7

API Reference

OpenAIProvider

class OpenAIProvider(BaseModelProvider):
    """OpenAI model provider implementation."""
    
    def __init__(self, config: ModelProviderConfig):
        """Initialize OpenAI provider.
        
        Args:
            config: Provider configuration
            
        Raises:
            ModelProviderError: If configuration is invalid
        """

Methods

get_provider_name()

Get the provider name.
def get_provider_name(self) -> str:
    """Get provider name.
    
    Returns:
        str: "openai"
    """

get_default_model()

Get the default model name.
def get_default_model(self) -> str:
    """Get default model.
    
    Returns:
        str: "gpt-4o-mini"
    """

get_langchain_model()

Get a LangChain ChatOpenAI model instance.
def get_langchain_model(self, **kwargs) -> ChatOpenAI:
    """Get LangChain ChatOpenAI model instance.
    
    This method returns a configured ChatOpenAI instance that can be used
    with LangGraph and other LangChain components.
    
    Args:
        **kwargs: Additional model configuration to override defaults
                 (e.g., model, temperature, max_tokens)
    
    Returns:
        ChatOpenAI: Configured ChatOpenAI instance
    """
Example:
from ag_kit_py.providers import OpenAIProvider, ModelProviderConfig

config = ModelProviderConfig(api_key="sk-...", default_model="gpt-4o-mini")
provider = OpenAIProvider(config)

# Get default model
model = provider.get_langchain_model()

# Override configuration
model = provider.get_langchain_model(
    model="gpt-4o",
    temperature=0.9,
    max_tokens=2000
)

# Use with LangChain
from langchain_core.messages import HumanMessage

response = await model.ainvoke([HumanMessage(content="Hello!")])
print(response.content)

create_completion()

Create a non-streaming chat completion.
async def create_completion(
    self, 
    params: ChatCompletionParams
) -> ChatCompletion:
    """Create a chat completion.
    
    Args:
        params: Completion parameters
        
    Returns:
        ChatCompletion: Complete response with message content and metadata
        
    Raises:
        ModelProviderError: If completion fails
    """
Example:
from ag_kit_py.providers import ChatCompletionParams

params = ChatCompletionParams(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is the capital of France?"}
    ],
    temperature=0.7,
    max_tokens=100
)

response = await provider.create_completion(params)
print(response.choices[0]["message"]["content"])

create_stream()

Create a streaming chat completion.
async def create_stream(
    self, 
    params: ChatCompletionParams
) -> AsyncIterator[ChatCompletionChunk]:
    """Create a streaming chat completion.
    
    Args:
        params: Completion parameters
        
    Yields:
        ChatCompletionChunk: Incremental response chunks
        
    Raises:
        ModelProviderError: If streaming fails
    """
Example:
from ag_kit_py.providers import ChatCompletionParams

params = ChatCompletionParams(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Tell me a story"}],
    temperature=0.7,
    stream=True
)

async for chunk in provider.create_stream(params):
    content = chunk.choices[0]["delta"].get("content", "")
    if content:
        print(content, end="", flush=True)

format_tools()

Format tools for OpenAI API.
def format_tools(self, tools: List[Any]) -> List[Dict[str, Any]]:
    """Format tools for OpenAI API.
    
    Args:
        tools: List of tools in various formats
        
    Returns:
        List[Dict[str, Any]]: Formatted tool definitions
    """
Example:
tools = [
    {
        "name": "get_weather",
        "description": "Get current weather",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            },
            "required": ["location"]
        }
    }
]

formatted = provider.format_tools(tools)
# Returns:
# [{
#     "type": "function",
#     "function": {
#         "name": "get_weather",
#         "description": "Get current weather",
#         "parameters": {...}
#     }
# }]

parse_tool_calls()

Parse tool calls from completion response.
def parse_tool_calls(self, response: ChatCompletion) -> List[ToolCall]:
    """Parse tool calls from completion response.
    
    Args:
        response: Chat completion response
        
    Returns:
        List[ToolCall]: Parsed tool calls
    """
Example:
response = await provider.create_completion(params)
tool_calls = provider.parse_tool_calls(response)

for call in tool_calls:
    print(f"Tool: {call.function['name']}")
    print(f"Arguments: {call.function['arguments']}")

Usage Examples

Simple Chat

from ag_kit_py.providers import create_openai_provider, ChatCompletionParams

# Create provider
provider = create_openai_provider(api_key="sk-...")

# Create completion
params = ChatCompletionParams(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Hello!"}
    ]
)

response = await provider.create_completion(params)
print(response.choices[0]["message"]["content"])

Streaming Response

from ag_kit_py.providers import create_openai_provider, ChatCompletionParams

provider = create_openai_provider(api_key="sk-...")

params = ChatCompletionParams(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Write a poem about Python"}],
    temperature=0.8,
    stream=True
)

print("Streaming response:")
async for chunk in provider.create_stream(params):
    content = chunk.choices[0]["delta"].get("content", "")
    if content:
        print(content, end="", flush=True)
print("\n")

Tool/Function Calling

from ag_kit_py.providers import create_openai_provider, ChatCompletionParams
import json

provider = create_openai_provider(api_key="sk-...")

# Define tools
tools = [
    {
        "name": "get_weather",
        "description": "Get the current weather for a location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "City name, e.g., 'Paris'"
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "Temperature unit"
                }
            },
            "required": ["location"]
        }
    }
]

# Request with tools
params = ChatCompletionParams(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "What's the weather in Paris?"}],
    tools=tools,
    tool_choice="auto"
)

response = await provider.create_completion(params)

# Check for tool calls
tool_calls = provider.parse_tool_calls(response)
if tool_calls:
    for call in tool_calls:
        print(f"Tool: {call.function['name']}")
        args = json.loads(call.function['arguments'])
        print(f"Arguments: {args}")
        
        # Execute tool (mock)
        if call.function['name'] == "get_weather":
            result = {"temperature": 22, "condition": "sunny"}
            
            # Add tool result to conversation
            params.messages.append({
                "role": "assistant",
                "content": None,
                "tool_calls": [{"id": call.id, "function": call.function}]
            })
            params.messages.append({
                "role": "tool",
                "content": json.dumps(result),
                "tool_call_id": call.id
            })
            
            # Get final response
            final_response = await provider.create_completion(params)
            print(f"Final: {final_response.choices[0]['message']['content']}")

Multi-turn Conversation

from ag_kit_py.providers import create_openai_provider, ChatCompletionParams

provider = create_openai_provider(api_key="sk-...")

# Conversation history
messages = [
    {"role": "system", "content": "You are a helpful assistant."}
]

async def chat(user_input: str):
    # Add user message
    messages.append({"role": "user", "content": user_input})
    
    # Get response
    params = ChatCompletionParams(
        model="gpt-4o-mini",
        messages=messages,
        temperature=0.7
    )
    
    response = await provider.create_completion(params)
    assistant_message = response.choices[0]["message"]["content"]
    
    # Add assistant message to history
    messages.append({"role": "assistant", "content": assistant_message})
    
    return assistant_message

# Have a conversation
response1 = await chat("My name is Alice")
print(f"AI: {response1}")

response2 = await chat("What's my name?")
print(f"AI: {response2}")  # Should remember "Alice"

Using with LangGraph

from ag_kit_py.providers import create_openai_provider
from ag_kit_py.agents import LangGraphAgent
from langgraph.graph import StateGraph, MessagesState

# Create provider
provider = create_openai_provider(api_key="sk-...")

# Get LangChain model
model = provider.get_langchain_model(model="gpt-4o-mini", temperature=0.7)

# Build LangGraph workflow
def chat_node(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

workflow = StateGraph(MessagesState)
workflow.add_node("chat", chat_node)
workflow.set_entry_point("chat")
workflow.set_finish_point("chat")

# Compile and create agent
graph = workflow.compile()
agent = LangGraphAgent(
    name="ChatAgent",
    description="A conversational agent powered by OpenAI",
    graph=graph
)

OpenAI-Compatible Endpoints

The OpenAI provider works with any OpenAI-compatible API by setting the base_url:

Azure OpenAI

from ag_kit_py.providers import OpenAIProvider, ModelProviderConfig

config = ModelProviderConfig(
    api_key="your-azure-key",
    base_url="https://your-resource.openai.azure.com/openai/deployments/your-deployment",
    default_model="gpt-4o-mini"
)

provider = OpenAIProvider(config)

Local Models (e.g., Ollama, LM Studio)

from ag_kit_py.providers import create_openai_provider

# Ollama
provider = create_openai_provider(
    api_key="ollama",  # Ollama doesn't require a real key
    base_url="http://localhost:11434/v1",
    model="llama2"
)

# LM Studio
provider = create_openai_provider(
    api_key="lm-studio",
    base_url="http://localhost:1234/v1",
    model="local-model"
)

Third-Party Services

from ag_kit_py.providers import create_openai_provider

# Any OpenAI-compatible service
provider = create_openai_provider(
    api_key="your-api-key",
    base_url="https://api.your-service.com/v1",
    model="custom-model"
)

Error Handling

The OpenAI provider converts errors to ModelProviderError with specific error types:
from ag_kit_py.providers import ModelProviderError

try:
    response = await provider.create_completion(params)
except ModelProviderError as e:
    if e.error_type == "authentication":
        print("Invalid API key")
    elif e.error_type == "rate_limit":
        print("Rate limit exceeded")
    elif e.error_type == "quota_exceeded":
        print("Quota exceeded")
    elif e.error_type == "invalid_request":
        print("Invalid request parameters")
    elif e.error_type == "server_error":
        print("OpenAI server error")
    elif e.error_type == "timeout":
        print("Request timeout")
    else:
        print(f"Unknown error: {e.message}")

Error Types

The provider maps OpenAI errors to these types:
  • authentication: 401 Unauthorized, invalid API key
  • rate_limit: 429 Too Many Requests
  • quota_exceeded: Quota or billing issues
  • invalid_request: 400 Bad Request
  • server_error: 500 Internal Server Error
  • timeout: Request timeout
  • unknown: Other errors

Retry Logic

from ag_kit_py.providers import ModelProviderError
import asyncio

async def call_with_retry(provider, params, max_retries=3):
    """Call provider with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            return await provider.create_completion(params)
        except ModelProviderError as e:
            # Don't retry authentication or invalid request errors
            if e.error_type in ["authentication", "invalid_request"]:
                raise
            
            # Retry rate limit and server errors
            if e.error_type in ["rate_limit", "server_error", "timeout"]:
                if attempt == max_retries - 1:
                    raise
                
                # Exponential backoff
                wait_time = 2 ** attempt
                print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
                await asyncio.sleep(wait_time)
            else:
                raise

# Usage
try:
    response = await call_with_retry(provider, params)
except ModelProviderError as e:
    print(f"Failed after retries: {e.message}")

Best Practices

1. Reuse Provider Instances

# Good: Create once, reuse
provider = create_openai_provider(api_key="sk-...")
model = provider.get_langchain_model()

# Use multiple times
response1 = await model.ainvoke(messages1)
response2 = await model.ainvoke(messages2)

# Bad: Create new instance each time
for messages in message_list:
    provider = create_openai_provider(api_key="sk-...")  # Wasteful
    response = await provider.create_completion(...)

2. Use Environment Variables

# Good: Use environment variables
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
provider = create_provider("openai")

# Bad: Hardcode credentials
provider = create_openai_provider(api_key="sk-...")  # Don't commit!

3. Set Appropriate Timeouts

from ag_kit_py.providers import ModelProviderConfig

# For quick responses
config = ModelProviderConfig(
    api_key="sk-...",
    timeout=10000  # 10 seconds
)

# For long-running tasks
config = ModelProviderConfig(
    api_key="sk-...",
    timeout=120000  # 2 minutes
)

4. Handle Streaming Errors

async def safe_stream(provider, params):
    """Stream with error handling."""
    try:
        async for chunk in provider.create_stream(params):
            content = chunk.choices[0]["delta"].get("content", "")
            if content:
                yield content
    except ModelProviderError as e:
        yield f"\n[Error: {e.message}]"
    except Exception as e:
        yield f"\n[Unexpected error: {str(e)}]"

5. Monitor Token Usage

response = await provider.create_completion(params)

usage = response.usage
print(f"Prompt tokens: {usage['prompt_tokens']}")
print(f"Completion tokens: {usage['completion_tokens']}")
print(f"Total tokens: {usage['total_tokens']}")

# Estimate cost (example for gpt-4o-mini)
prompt_cost = usage['prompt_tokens'] * 0.00015 / 1000
completion_cost = usage['completion_tokens'] * 0.0006 / 1000
total_cost = prompt_cost + completion_cost
print(f"Estimated cost: ${total_cost:.6f}")

Helper Functions

create_openai_provider()

Convenience function for creating OpenAI providers:
def create_openai_provider(
    api_key: str,
    model: Optional[str] = None,
    base_url: Optional[str] = None,
    temperature: float = 0.7,
    **kwargs
) -> OpenAIProvider:
    """Create an OpenAI provider with simplified configuration.
    
    Args:
        api_key: OpenAI API key
        model: Model name (default: gpt-4o-mini)
        base_url: Custom API endpoint
        temperature: Sampling temperature
        **kwargs: Additional configuration options
        
    Returns:
        OpenAIProvider: Configured OpenAI provider
    """
Example:
from ag_kit_py.providers import create_openai_provider

provider = create_openai_provider(
    api_key="sk-...",
    model="gpt-4o-mini",
    temperature=0.7,
    timeout=30000,
    max_retries=3
)