Module zep_python.message.client
Expand source code
from __future__ import annotations
import urllib.parse
from typing import Any, Dict, List, Optional
import httpx
from zep_python.exceptions import handle_response
from .models import Message
class MessageClient:
"""
MessageClient class for interacting with the Zep message API.
Attributes
----------
aclient : httpx.AsyncClient
The async client used for making API requests.
client : httpx.Client
The client used for making API requests.
"""
aclient: httpx.AsyncClient
client: httpx.Client
def __init__(self, aclient: httpx.AsyncClient, client: httpx.Client) -> None:
"""
Initializes a MessageClient object.
Parameters
----------
aclient : httpx.AsyncClient
The async client used for making API requests.
client : httpx.Client
The client used for making API requests.
"""
self.aclient = aclient
self.client = client
def _validate_cursor_limit(
self, limit: int, cursor: int
) -> Optional[Dict[str, int]]:
"""
Validates the cursor and limit parameters.
Parameters
----------
limit : int
The number of messages to return per page.
cursor : int
The page number to return.
Raises
------
ValueError
If the limit or cursor is invalid.
"""
if limit <= 0 or cursor <= 0:
raise ValueError("Both limit and cursor must be positive integers")
return {"limit": limit, "cursor": cursor}
def get_session_messages(
self, session_id: str, limit: int = 100, cursor: int = 1
) -> List[Message]:
"""
Gets all messages for a session.
Parameters
----------
session_id : str
The ID of the session.
limit : int
The number of messages to return per page.
cursor : int
The page number to return.
Returns
-------
List[Message]
The list of messages for the session.
Raises
------
ValueError
If the session ID is empty.
ConnectionError
If unable to connect to the server.
APIError
If unable to get messages for the session.
NotFoundError
If the session is not found.
"""
if session_id is None or session_id.strip() == "":
raise ValueError("Session ID cannot be empty.")
params = self._validate_cursor_limit(limit, cursor)
url = f"/sessions/{urllib.parse.quote_plus(session_id)}/messages"
try:
response = self.client.get(url=url, params=params)
except httpx.NetworkError:
raise ConnectionError("Unable to connect to server.")
handle_response(response, f"Unable to get messages for session {session_id}.")
return [
Message.model_validate(message) for message in response.json()["messages"]
]
async def aget_session_messages(
self, session_id: str, limit: int = 100, cursor: int = 1
) -> List[Message]:
"""
Gets all messages for a session.
Parameters
----------
session_id : str
The ID of the session.
limit : int
The number of messages to return per page.
cursor : int
The page number to return.
Returns
-------
List[Message]
The list of messages for the session.
Raises
------
ValueError
If the session ID is empty.
ConnectionError
If unable to connect to the server.
APIError
If unable to get messages for the session.
NotFoundError
If the session is not found.
"""
if session_id is None or session_id.strip() == "":
raise ValueError("Session ID cannot be empty.")
params = self._validate_cursor_limit(limit, cursor)
url = f"/sessions/{urllib.parse.quote_plus(session_id)}/messages"
try:
response = await self.aclient.get(url=url, params=params)
except httpx.NetworkError:
raise ConnectionError("Unable to connect to server.")
handle_response(response, f"Unable to get messages for session {session_id}.")
return [
Message.model_validate(message) for message in response.json()["messages"]
]
def get_session_message(self, session_id: str, message_id: str) -> Message:
"""
Gets a specific message from a session
Parameters
----------
session_id : str
The ID of the session.
message_id : str
The ID of the message.
Returns
-------
Message
The message.
"""
if session_id is None or session_id.strip() == "":
raise ValueError("Session ID cannot be empty.")
if message_id is None or message_id.strip() == "":
raise ValueError("Message ID cannot be empty.")
encoded_message_id = urllib.parse.quote_plus(message_id)
encoded_session_id = urllib.parse.quote_plus(session_id)
url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}"
try:
response = self.client.get(url=url)
except httpx.NetworkError:
raise ConnectionError("Unable to connect to server.")
handle_response(
response, f"Unable to get message {message_id} for session {session_id}."
)
return Message.model_validate(response.json())
async def aget_session_message(self, session_id: str, message_id: str) -> Message:
"""
Gets a specific message from a session
Parameters
----------
session_id : str
The ID of the session.
message_id : str
The ID of the message.
Returns
-------
Message
The message.
"""
if session_id is None or session_id.strip() == "":
raise ValueError("Session ID cannot be empty.")
if message_id is None or message_id.strip() == "":
raise ValueError("Message ID cannot be empty.")
encoded_message_id = urllib.parse.quote_plus(message_id)
encoded_session_id = urllib.parse.quote_plus(session_id)
url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}"
try:
response = await self.aclient.get(url=url)
except httpx.NetworkError:
raise ConnectionError("Unable to connect to server.")
handle_response(
response, f"Unable to get message {message_id} for session {session_id}."
)
return Message.model_validate(response.json())
def update_message_metadata(
self, session_id: str, message_id: str, metadata: Dict[str, Any]
) -> Message:
"""
Updates the metadata of a message.
Parameters
----------
session_id : str
The ID of the session.
message_id : str
The ID of the message.
metadata : Dict[str, Any]
The metadata to update.
Returns
-------
Message
The updated message.
"""
if session_id is None or session_id.strip() == "":
raise ValueError("Session ID cannot be empty.")
if message_id is None or message_id.strip() == "":
raise ValueError("Message ID cannot be empty.")
encoded_message_id = urllib.parse.quote_plus(message_id)
encoded_session_id = urllib.parse.quote_plus(session_id)
url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}"
try:
response = self.client.patch(url=url, json=metadata)
except httpx.NetworkError:
raise ConnectionError("Unable to connect to server.")
handle_response(
response,
f"Unable to update message metadata {message_id} for session {session_id}.",
)
response_data = response.json()
return Message.model_validate(response_data)
async def aupdate_message_metadata(
self, session_id: str, message_id: str, metadata: Dict[str, Any]
) -> Message:
"""
Updates the metadata of a message.
Parameters
----------
session_id : str
The ID of the session.
message_id : str
The ID of the message.
metadata : Dict[str, Any]
The metadata to update.
Returns
-------
Message
The updated message.
"""
if session_id is None or session_id.strip() == "":
raise ValueError("Session ID cannot be empty.")
if message_id is None or message_id.strip() == "":
raise ValueError("Message ID cannot be empty.")
encoded_message_id = urllib.parse.quote_plus(message_id)
encoded_session_id = urllib.parse.quote_plus(session_id)
url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}"
try:
response = await self.aclient.patch(url=url, json=metadata)
except httpx.NetworkError:
raise ConnectionError("Unable to connect to server.")
handle_response(
response,
f"Unable to update message metadata {message_id} for session {session_id}.",
)
response_data = response.json()
return Message.model_validate(response_data)
Classes
class MessageClient (aclient: httpx.AsyncClient, client: httpx.Client)
-
MessageClient class for interacting with the Zep message API.
Attributes
aclient
:httpx.AsyncClient
- The async client used for making API requests.
client
:httpx.Client
- The client used for making API requests.
Initializes a MessageClient object.
Parameters
aclient
:httpx.AsyncClient
- The async client used for making API requests.
client
:httpx.Client
- The client used for making API requests.
Expand source code
class MessageClient: """ MessageClient class for interacting with the Zep message API. Attributes ---------- aclient : httpx.AsyncClient The async client used for making API requests. client : httpx.Client The client used for making API requests. """ aclient: httpx.AsyncClient client: httpx.Client def __init__(self, aclient: httpx.AsyncClient, client: httpx.Client) -> None: """ Initializes a MessageClient object. Parameters ---------- aclient : httpx.AsyncClient The async client used for making API requests. client : httpx.Client The client used for making API requests. """ self.aclient = aclient self.client = client def _validate_cursor_limit( self, limit: int, cursor: int ) -> Optional[Dict[str, int]]: """ Validates the cursor and limit parameters. Parameters ---------- limit : int The number of messages to return per page. cursor : int The page number to return. Raises ------ ValueError If the limit or cursor is invalid. """ if limit <= 0 or cursor <= 0: raise ValueError("Both limit and cursor must be positive integers") return {"limit": limit, "cursor": cursor} def get_session_messages( self, session_id: str, limit: int = 100, cursor: int = 1 ) -> List[Message]: """ Gets all messages for a session. Parameters ---------- session_id : str The ID of the session. limit : int The number of messages to return per page. cursor : int The page number to return. Returns ------- List[Message] The list of messages for the session. Raises ------ ValueError If the session ID is empty. ConnectionError If unable to connect to the server. APIError If unable to get messages for the session. NotFoundError If the session is not found. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") params = self._validate_cursor_limit(limit, cursor) url = f"/sessions/{urllib.parse.quote_plus(session_id)}/messages" try: response = self.client.get(url=url, params=params) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response(response, f"Unable to get messages for session {session_id}.") return [ Message.model_validate(message) for message in response.json()["messages"] ] async def aget_session_messages( self, session_id: str, limit: int = 100, cursor: int = 1 ) -> List[Message]: """ Gets all messages for a session. Parameters ---------- session_id : str The ID of the session. limit : int The number of messages to return per page. cursor : int The page number to return. Returns ------- List[Message] The list of messages for the session. Raises ------ ValueError If the session ID is empty. ConnectionError If unable to connect to the server. APIError If unable to get messages for the session. NotFoundError If the session is not found. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") params = self._validate_cursor_limit(limit, cursor) url = f"/sessions/{urllib.parse.quote_plus(session_id)}/messages" try: response = await self.aclient.get(url=url, params=params) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response(response, f"Unable to get messages for session {session_id}.") return [ Message.model_validate(message) for message in response.json()["messages"] ] def get_session_message(self, session_id: str, message_id: str) -> Message: """ Gets a specific message from a session Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. Returns ------- Message The message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = self.client.get(url=url) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to get message {message_id} for session {session_id}." ) return Message.model_validate(response.json()) async def aget_session_message(self, session_id: str, message_id: str) -> Message: """ Gets a specific message from a session Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. Returns ------- Message The message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = await self.aclient.get(url=url) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to get message {message_id} for session {session_id}." ) return Message.model_validate(response.json()) def update_message_metadata( self, session_id: str, message_id: str, metadata: Dict[str, Any] ) -> Message: """ Updates the metadata of a message. Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. metadata : Dict[str, Any] The metadata to update. Returns ------- Message The updated message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = self.client.patch(url=url, json=metadata) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to update message metadata {message_id} for session {session_id}.", ) response_data = response.json() return Message.model_validate(response_data) async def aupdate_message_metadata( self, session_id: str, message_id: str, metadata: Dict[str, Any] ) -> Message: """ Updates the metadata of a message. Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. metadata : Dict[str, Any] The metadata to update. Returns ------- Message The updated message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = await self.aclient.patch(url=url, json=metadata) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to update message metadata {message_id} for session {session_id}.", ) response_data = response.json() return Message.model_validate(response_data)
Class variables
var aclient : httpx.AsyncClient
var client : httpx.Client
Methods
async def aget_session_message(self, session_id: str, message_id: str) ‑> Message
-
Gets a specific message from a session
Parameters
session_id
:str
- The ID of the session.
message_id
:str
- The ID of the message.
Returns
Message
- The message.
Expand source code
async def aget_session_message(self, session_id: str, message_id: str) -> Message: """ Gets a specific message from a session Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. Returns ------- Message The message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = await self.aclient.get(url=url) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to get message {message_id} for session {session_id}." ) return Message.model_validate(response.json())
async def aget_session_messages(self, session_id: str, limit: int = 100, cursor: int = 1) ‑> List[Message]
-
Gets all messages for a session.
Parameters
session_id
:str
- The ID of the session.
limit
:int
- The number of messages to return per page.
cursor
:int
- The page number to return.
Returns
List[Message]
- The list of messages for the session.
Raises
ValueError
- If the session ID is empty.
ConnectionError
- If unable to connect to the server.
APIError
- If unable to get messages for the session.
NotFoundError
- If the session is not found.
Expand source code
async def aget_session_messages( self, session_id: str, limit: int = 100, cursor: int = 1 ) -> List[Message]: """ Gets all messages for a session. Parameters ---------- session_id : str The ID of the session. limit : int The number of messages to return per page. cursor : int The page number to return. Returns ------- List[Message] The list of messages for the session. Raises ------ ValueError If the session ID is empty. ConnectionError If unable to connect to the server. APIError If unable to get messages for the session. NotFoundError If the session is not found. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") params = self._validate_cursor_limit(limit, cursor) url = f"/sessions/{urllib.parse.quote_plus(session_id)}/messages" try: response = await self.aclient.get(url=url, params=params) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response(response, f"Unable to get messages for session {session_id}.") return [ Message.model_validate(message) for message in response.json()["messages"] ]
async def aupdate_message_metadata(self, session_id: str, message_id: str, metadata: Dict[str, Any]) ‑> Message
-
Updates the metadata of a message.
Parameters
session_id
:str
- The ID of the session.
message_id
:str
- The ID of the message.
metadata
:Dict[str, Any]
- The metadata to update.
Returns
Message
- The updated message.
Expand source code
async def aupdate_message_metadata( self, session_id: str, message_id: str, metadata: Dict[str, Any] ) -> Message: """ Updates the metadata of a message. Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. metadata : Dict[str, Any] The metadata to update. Returns ------- Message The updated message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = await self.aclient.patch(url=url, json=metadata) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to update message metadata {message_id} for session {session_id}.", ) response_data = response.json() return Message.model_validate(response_data)
def get_session_message(self, session_id: str, message_id: str) ‑> Message
-
Gets a specific message from a session
Parameters
session_id
:str
- The ID of the session.
message_id
:str
- The ID of the message.
Returns
Message
- The message.
Expand source code
def get_session_message(self, session_id: str, message_id: str) -> Message: """ Gets a specific message from a session Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. Returns ------- Message The message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = self.client.get(url=url) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to get message {message_id} for session {session_id}." ) return Message.model_validate(response.json())
def get_session_messages(self, session_id: str, limit: int = 100, cursor: int = 1) ‑> List[Message]
-
Gets all messages for a session.
Parameters
session_id
:str
- The ID of the session.
limit
:int
- The number of messages to return per page.
cursor
:int
- The page number to return.
Returns
List[Message]
- The list of messages for the session.
Raises
ValueError
- If the session ID is empty.
ConnectionError
- If unable to connect to the server.
APIError
- If unable to get messages for the session.
NotFoundError
- If the session is not found.
Expand source code
def get_session_messages( self, session_id: str, limit: int = 100, cursor: int = 1 ) -> List[Message]: """ Gets all messages for a session. Parameters ---------- session_id : str The ID of the session. limit : int The number of messages to return per page. cursor : int The page number to return. Returns ------- List[Message] The list of messages for the session. Raises ------ ValueError If the session ID is empty. ConnectionError If unable to connect to the server. APIError If unable to get messages for the session. NotFoundError If the session is not found. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") params = self._validate_cursor_limit(limit, cursor) url = f"/sessions/{urllib.parse.quote_plus(session_id)}/messages" try: response = self.client.get(url=url, params=params) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response(response, f"Unable to get messages for session {session_id}.") return [ Message.model_validate(message) for message in response.json()["messages"] ]
def update_message_metadata(self, session_id: str, message_id: str, metadata: Dict[str, Any]) ‑> Message
-
Updates the metadata of a message.
Parameters
session_id
:str
- The ID of the session.
message_id
:str
- The ID of the message.
metadata
:Dict[str, Any]
- The metadata to update.
Returns
Message
- The updated message.
Expand source code
def update_message_metadata( self, session_id: str, message_id: str, metadata: Dict[str, Any] ) -> Message: """ Updates the metadata of a message. Parameters ---------- session_id : str The ID of the session. message_id : str The ID of the message. metadata : Dict[str, Any] The metadata to update. Returns ------- Message The updated message. """ if session_id is None or session_id.strip() == "": raise ValueError("Session ID cannot be empty.") if message_id is None or message_id.strip() == "": raise ValueError("Message ID cannot be empty.") encoded_message_id = urllib.parse.quote_plus(message_id) encoded_session_id = urllib.parse.quote_plus(session_id) url = f"/sessions/{encoded_session_id}/messages/{encoded_message_id}" try: response = self.client.patch(url=url, json=metadata) except httpx.NetworkError: raise ConnectionError("Unable to connect to server.") handle_response( response, f"Unable to update message metadata {message_id} for session {session_id}.", ) response_data = response.json() return Message.model_validate(response_data)