import asyncio
import tempfile
from ag_kit_py.tools import UnsafeLocalCodeExecutor, create_bash_tool
from ag_kit_py.tools.bash import create_local_bash_context
from ag_kit_py.tools.fs import (
create_read_tool,
create_write_tool,
ExecutionContext,
LocalFileOperator,
)
from ag_kit_py.tools.adapters import AGKitTool
async def main():
with tempfile.TemporaryDirectory() as temp_dir:
# Setup contexts
fs_context = ExecutionContext(
working_directory=temp_dir,
fs_operator=LocalFileOperator()
)
bash_context = create_local_bash_context(cwd=temp_dir)
# Create and convert all tools to LangChain
tools = [
AGKitTool(tool=create_write_tool(fs_context)).as_langchain_tool(),
AGKitTool(tool=create_read_tool(fs_context)).as_langchain_tool(),
AGKitTool(tool=create_bash_tool(bash_context)).as_langchain_tool(),
AGKitTool(tool=UnsafeLocalCodeExecutor()).as_langchain_tool(),
]
# Workflow: Write → Read → Process → Execute
print("1. Writing Python script...")
await tools[0].ainvoke({
"file_path": "script.py",
"content": "numbers = [1, 2, 3, 4, 5]\nprint(f'Sum: {sum(numbers)}')",
"create_dirs": True
})
print("2. Reading script...")
content = await tools[1].ainvoke({"file_path": "script.py"})
print(f" Content length: {len(str(content))} chars")
print("3. Listing files...")
await tools[2].ainvoke({"command": "ls -la"})
print("4. Executing script...")
result = await tools[3].ainvoke({
"language": "python",
"code": "numbers = [1, 2, 3, 4, 5]\nprint(f'Sum: {sum(numbers)}')"
})
print(f" Result: {result[:50]}...")
print("\n✅ Workflow completed successfully!")
asyncio.run(main())