OpenAI Provider
The OpenAI Provider implements theBaseModelProvider interface for OpenAI API and any OpenAI-compatible endpoints (e.g., Azure OpenAI, local models, third-party services).
Installation
Copy
pip install ag_kit_py
langchain-openai for LangChain integration, which is installed automatically.
Quick Start
Basic Usage
Copy
from ag_kit_py.providers import OpenAIProvider, ModelProviderConfig
# Create configuration
config = ModelProviderConfig(
api_key="sk-...",
default_model="gpt-4o-mini",
temperature=0.7
)
# Create provider
provider = OpenAIProvider(config)
# Get LangChain model
model = provider.get_langchain_model()
Using Factory Function
Copy
from ag_kit_py.providers import create_openai_provider
provider = create_openai_provider(
api_key="sk-...",
model="gpt-4o-mini",
temperature=0.7
)
Using Environment Variables
Copy
import os
from ag_kit_py.providers import create_provider
# Set environment variables
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["OPENAI_MODEL"] = "gpt-4o-mini"
# Create provider (automatically loads from environment)
provider = create_provider("openai")
Configuration
ModelProviderConfig
Copy
from ag_kit_py.providers import ModelProviderConfig
config = ModelProviderConfig(
api_key="sk-...", # Required: OpenAI API key
base_url=None, # Optional: Custom endpoint
default_model="gpt-4o-mini", # Optional: Default model
temperature=0.7, # Optional: Sampling temperature
timeout=30000, # Optional: Request timeout (ms)
max_retries=3, # Optional: Max retry attempts
retry_delay=1000, # Optional: Retry delay (ms)
organization=None, # Optional: Organization ID
project=None # Optional: Project ID
)
provider = OpenAIProvider(config)
Environment Variables
The OpenAI provider supports these environment variables:Copy
# Required
OPENAI_API_KEY=sk-...
# Optional
OPENAI_MODEL=gpt-4o-mini
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_TEMPERATURE=0.7
API Reference
OpenAIProvider
Copy
class OpenAIProvider(BaseModelProvider):
"""OpenAI model provider implementation."""
def __init__(self, config: ModelProviderConfig):
"""Initialize OpenAI provider.
Args:
config: Provider configuration
Raises:
ModelProviderError: If configuration is invalid
"""
Methods
get_provider_name()
Get the provider name.Copy
def get_provider_name(self) -> str:
"""Get provider name.
Returns:
str: "openai"
"""
get_default_model()
Get the default model name.Copy
def get_default_model(self) -> str:
"""Get default model.
Returns:
str: "gpt-4o-mini"
"""
get_langchain_model()
Get a LangChain ChatOpenAI model instance.Copy
def get_langchain_model(self, **kwargs) -> ChatOpenAI:
"""Get LangChain ChatOpenAI model instance.
This method returns a configured ChatOpenAI instance that can be used
with LangGraph and other LangChain components.
Args:
**kwargs: Additional model configuration to override defaults
(e.g., model, temperature, max_tokens)
Returns:
ChatOpenAI: Configured ChatOpenAI instance
"""
Copy
from ag_kit_py.providers import OpenAIProvider, ModelProviderConfig
config = ModelProviderConfig(api_key="sk-...", default_model="gpt-4o-mini")
provider = OpenAIProvider(config)
# Get default model
model = provider.get_langchain_model()
# Override configuration
model = provider.get_langchain_model(
model="gpt-4o",
temperature=0.9,
max_tokens=2000
)
# Use with LangChain
from langchain_core.messages import HumanMessage
response = await model.ainvoke([HumanMessage(content="Hello!")])
print(response.content)
create_completion()
Create a non-streaming chat completion.Copy
async def create_completion(
self,
params: ChatCompletionParams
) -> ChatCompletion:
"""Create a chat completion.
Args:
params: Completion parameters
Returns:
ChatCompletion: Complete response with message content and metadata
Raises:
ModelProviderError: If completion fails
"""
Copy
from ag_kit_py.providers import ChatCompletionParams
params = ChatCompletionParams(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"}
],
temperature=0.7,
max_tokens=100
)
response = await provider.create_completion(params)
print(response.choices[0]["message"]["content"])
create_stream()
Create a streaming chat completion.Copy
async def create_stream(
self,
params: ChatCompletionParams
) -> AsyncIterator[ChatCompletionChunk]:
"""Create a streaming chat completion.
Args:
params: Completion parameters
Yields:
ChatCompletionChunk: Incremental response chunks
Raises:
ModelProviderError: If streaming fails
"""
Copy
from ag_kit_py.providers import ChatCompletionParams
params = ChatCompletionParams(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Tell me a story"}],
temperature=0.7,
stream=True
)
async for chunk in provider.create_stream(params):
content = chunk.choices[0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
format_tools()
Format tools for OpenAI API.Copy
def format_tools(self, tools: List[Any]) -> List[Dict[str, Any]]:
"""Format tools for OpenAI API.
Args:
tools: List of tools in various formats
Returns:
List[Dict[str, Any]]: Formatted tool definitions
"""
Copy
tools = [
{
"name": "get_weather",
"description": "Get current weather",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
]
formatted = provider.format_tools(tools)
# Returns:
# [{
# "type": "function",
# "function": {
# "name": "get_weather",
# "description": "Get current weather",
# "parameters": {...}
# }
# }]
parse_tool_calls()
Parse tool calls from completion response.Copy
def parse_tool_calls(self, response: ChatCompletion) -> List[ToolCall]:
"""Parse tool calls from completion response.
Args:
response: Chat completion response
Returns:
List[ToolCall]: Parsed tool calls
"""
Copy
response = await provider.create_completion(params)
tool_calls = provider.parse_tool_calls(response)
for call in tool_calls:
print(f"Tool: {call.function['name']}")
print(f"Arguments: {call.function['arguments']}")
Usage Examples
Simple Chat
Copy
from ag_kit_py.providers import create_openai_provider, ChatCompletionParams
# Create provider
provider = create_openai_provider(api_key="sk-...")
# Create completion
params = ChatCompletionParams(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
]
)
response = await provider.create_completion(params)
print(response.choices[0]["message"]["content"])
Streaming Response
Copy
from ag_kit_py.providers import create_openai_provider, ChatCompletionParams
provider = create_openai_provider(api_key="sk-...")
params = ChatCompletionParams(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Write a poem about Python"}],
temperature=0.8,
stream=True
)
print("Streaming response:")
async for chunk in provider.create_stream(params):
content = chunk.choices[0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
print("\n")
Tool/Function Calling
Copy
from ag_kit_py.providers import create_openai_provider, ChatCompletionParams
import json
provider = create_openai_provider(api_key="sk-...")
# Define tools
tools = [
{
"name": "get_weather",
"description": "Get the current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g., 'Paris'"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
"required": ["location"]
}
}
]
# Request with tools
params = ChatCompletionParams(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "What's the weather in Paris?"}],
tools=tools,
tool_choice="auto"
)
response = await provider.create_completion(params)
# Check for tool calls
tool_calls = provider.parse_tool_calls(response)
if tool_calls:
for call in tool_calls:
print(f"Tool: {call.function['name']}")
args = json.loads(call.function['arguments'])
print(f"Arguments: {args}")
# Execute tool (mock)
if call.function['name'] == "get_weather":
result = {"temperature": 22, "condition": "sunny"}
# Add tool result to conversation
params.messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{"id": call.id, "function": call.function}]
})
params.messages.append({
"role": "tool",
"content": json.dumps(result),
"tool_call_id": call.id
})
# Get final response
final_response = await provider.create_completion(params)
print(f"Final: {final_response.choices[0]['message']['content']}")
Multi-turn Conversation
Copy
from ag_kit_py.providers import create_openai_provider, ChatCompletionParams
provider = create_openai_provider(api_key="sk-...")
# Conversation history
messages = [
{"role": "system", "content": "You are a helpful assistant."}
]
async def chat(user_input: str):
# Add user message
messages.append({"role": "user", "content": user_input})
# Get response
params = ChatCompletionParams(
model="gpt-4o-mini",
messages=messages,
temperature=0.7
)
response = await provider.create_completion(params)
assistant_message = response.choices[0]["message"]["content"]
# Add assistant message to history
messages.append({"role": "assistant", "content": assistant_message})
return assistant_message
# Have a conversation
response1 = await chat("My name is Alice")
print(f"AI: {response1}")
response2 = await chat("What's my name?")
print(f"AI: {response2}") # Should remember "Alice"
Using with LangGraph
Copy
from ag_kit_py.providers import create_openai_provider
from ag_kit_py.agents import LangGraphAgent
from langgraph.graph import StateGraph, MessagesState
# Create provider
provider = create_openai_provider(api_key="sk-...")
# Get LangChain model
model = provider.get_langchain_model(model="gpt-4o-mini", temperature=0.7)
# Build LangGraph workflow
def chat_node(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
workflow = StateGraph(MessagesState)
workflow.add_node("chat", chat_node)
workflow.set_entry_point("chat")
workflow.set_finish_point("chat")
# Compile and create agent
graph = workflow.compile()
agent = LangGraphAgent(
name="ChatAgent",
description="A conversational agent powered by OpenAI",
graph=graph
)
OpenAI-Compatible Endpoints
The OpenAI provider works with any OpenAI-compatible API by setting thebase_url:
Azure OpenAI
Copy
from ag_kit_py.providers import OpenAIProvider, ModelProviderConfig
config = ModelProviderConfig(
api_key="your-azure-key",
base_url="https://your-resource.openai.azure.com/openai/deployments/your-deployment",
default_model="gpt-4o-mini"
)
provider = OpenAIProvider(config)
Local Models (e.g., Ollama, LM Studio)
Copy
from ag_kit_py.providers import create_openai_provider
# Ollama
provider = create_openai_provider(
api_key="ollama", # Ollama doesn't require a real key
base_url="http://localhost:11434/v1",
model="llama2"
)
# LM Studio
provider = create_openai_provider(
api_key="lm-studio",
base_url="http://localhost:1234/v1",
model="local-model"
)
Third-Party Services
Copy
from ag_kit_py.providers import create_openai_provider
# Any OpenAI-compatible service
provider = create_openai_provider(
api_key="your-api-key",
base_url="https://api.your-service.com/v1",
model="custom-model"
)
Error Handling
The OpenAI provider converts errors toModelProviderError with specific error types:
Copy
from ag_kit_py.providers import ModelProviderError
try:
response = await provider.create_completion(params)
except ModelProviderError as e:
if e.error_type == "authentication":
print("Invalid API key")
elif e.error_type == "rate_limit":
print("Rate limit exceeded")
elif e.error_type == "quota_exceeded":
print("Quota exceeded")
elif e.error_type == "invalid_request":
print("Invalid request parameters")
elif e.error_type == "server_error":
print("OpenAI server error")
elif e.error_type == "timeout":
print("Request timeout")
else:
print(f"Unknown error: {e.message}")
Error Types
The provider maps OpenAI errors to these types:authentication: 401 Unauthorized, invalid API keyrate_limit: 429 Too Many Requestsquota_exceeded: Quota or billing issuesinvalid_request: 400 Bad Requestserver_error: 500 Internal Server Errortimeout: Request timeoutunknown: Other errors
Retry Logic
Copy
from ag_kit_py.providers import ModelProviderError
import asyncio
async def call_with_retry(provider, params, max_retries=3):
"""Call provider with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await provider.create_completion(params)
except ModelProviderError as e:
# Don't retry authentication or invalid request errors
if e.error_type in ["authentication", "invalid_request"]:
raise
# Retry rate limit and server errors
if e.error_type in ["rate_limit", "server_error", "timeout"]:
if attempt == max_retries - 1:
raise
# Exponential backoff
wait_time = 2 ** attempt
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
await asyncio.sleep(wait_time)
else:
raise
# Usage
try:
response = await call_with_retry(provider, params)
except ModelProviderError as e:
print(f"Failed after retries: {e.message}")
Best Practices
1. Reuse Provider Instances
Copy
# Good: Create once, reuse
provider = create_openai_provider(api_key="sk-...")
model = provider.get_langchain_model()
# Use multiple times
response1 = await model.ainvoke(messages1)
response2 = await model.ainvoke(messages2)
# Bad: Create new instance each time
for messages in message_list:
provider = create_openai_provider(api_key="sk-...") # Wasteful
response = await provider.create_completion(...)
2. Use Environment Variables
Copy
# Good: Use environment variables
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
provider = create_provider("openai")
# Bad: Hardcode credentials
provider = create_openai_provider(api_key="sk-...") # Don't commit!
3. Set Appropriate Timeouts
Copy
from ag_kit_py.providers import ModelProviderConfig
# For quick responses
config = ModelProviderConfig(
api_key="sk-...",
timeout=10000 # 10 seconds
)
# For long-running tasks
config = ModelProviderConfig(
api_key="sk-...",
timeout=120000 # 2 minutes
)
4. Handle Streaming Errors
Copy
async def safe_stream(provider, params):
"""Stream with error handling."""
try:
async for chunk in provider.create_stream(params):
content = chunk.choices[0]["delta"].get("content", "")
if content:
yield content
except ModelProviderError as e:
yield f"\n[Error: {e.message}]"
except Exception as e:
yield f"\n[Unexpected error: {str(e)}]"
5. Monitor Token Usage
Copy
response = await provider.create_completion(params)
usage = response.usage
print(f"Prompt tokens: {usage['prompt_tokens']}")
print(f"Completion tokens: {usage['completion_tokens']}")
print(f"Total tokens: {usage['total_tokens']}")
# Estimate cost (example for gpt-4o-mini)
prompt_cost = usage['prompt_tokens'] * 0.00015 / 1000
completion_cost = usage['completion_tokens'] * 0.0006 / 1000
total_cost = prompt_cost + completion_cost
print(f"Estimated cost: ${total_cost:.6f}")
Helper Functions
create_openai_provider()
Convenience function for creating OpenAI providers:Copy
def create_openai_provider(
api_key: str,
model: Optional[str] = None,
base_url: Optional[str] = None,
temperature: float = 0.7,
**kwargs
) -> OpenAIProvider:
"""Create an OpenAI provider with simplified configuration.
Args:
api_key: OpenAI API key
model: Model name (default: gpt-4o-mini)
base_url: Custom API endpoint
temperature: Sampling temperature
**kwargs: Additional configuration options
Returns:
OpenAIProvider: Configured OpenAI provider
"""
Copy
from ag_kit_py.providers import create_openai_provider
provider = create_openai_provider(
api_key="sk-...",
model="gpt-4o-mini",
temperature=0.7,
timeout=30000,
max_retries=3
)
Related Documentation
- Providers Overview - Provider system overview
- Base Provider - Abstract base class
- Factory Functions - Provider creation utilities
- LangGraph Agent - Using providers with agents