Skip to content

Completion API

completion(model, messages, temperature=None, max_tokens=None, stream=False, tools=None, tool_choice=None, response_format=None, num_retries=3, retry_strategy='exponential_backoff_retry', cache=None, api_key=None, api_base=None, timeout=600.0, **kwargs)

Make a completion request to an LLM provider.

Compatible with litellm.completion() API.

Parameters:

Name Type Description Default
model str

Model name in format "provider/model-name" or just "model-name"

required
messages list[Dict[str, Any]]

List of message dicts with "role" and "content"

required
temperature Optional[float]

Sampling temperature (0-2)

None
max_tokens Optional[int]

Maximum tokens to generate

None
stream bool

Whether to stream the response

False
tools Optional[list[Tool]]

List of tool/function definitions

None
tool_choice Optional[Union[str, Dict[str, Any]]]

How to choose tools ("auto", "required", or specific tool)

None
response_format Optional[ResponseFormat]

Response format (dict or Pydantic model)

None
num_retries int

Number of retries on rate limit or timeout

3
retry_strategy str

Retry strategy (currently only "exponential_backoff_retry")

'exponential_backoff_retry'
cache Optional[Dict[str, Any]]

Cache control dict (for compatibility, not used by ullm)

None
api_key Optional[str]

API key (if not in environment)

None
api_base Optional[str]

API base URL (if not default)

None
timeout float

Request timeout in seconds

600.0
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
Union[ModelResponse, Iterator[StreamChunk]]

ModelResponse or Iterator[StreamChunk] if streaming

Raises:

Type Description
AuthenticationError

On authentication failure

BadRequestError

On invalid request

RateLimitError

On rate limit exceeded

Timeout

On request timeout

APIError

On other API errors

Source code in ullm/main.py
def completion(
    model: str,
    messages: list[Dict[str, Any]],
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    stream: bool = False,
    tools: Optional[list[Tool]] = None,
    tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
    response_format: Optional[ResponseFormat] = None,
    num_retries: int = 3,
    retry_strategy: str = "exponential_backoff_retry",
    cache: Optional[Dict[str, Any]] = None,
    api_key: Optional[str] = None,
    api_base: Optional[str] = None,
    timeout: float = 600.0,
    **kwargs: Any,
) -> Union[ModelResponse, Iterator[StreamChunk]]:
    """
    Make a completion request to an LLM provider.

    Compatible with litellm.completion() API.

    Args:
        model: Model name in format "provider/model-name" or just "model-name"
        messages: List of message dicts with "role" and "content"
        temperature: Sampling temperature (0-2)
        max_tokens: Maximum tokens to generate
        stream: Whether to stream the response
        tools: List of tool/function definitions
        tool_choice: How to choose tools ("auto", "required", or specific tool)
        response_format: Response format (dict or Pydantic model)
        num_retries: Number of retries on rate limit or timeout
        retry_strategy: Retry strategy (currently only "exponential_backoff_retry")
        cache: Cache control dict (for compatibility, not used by ullm)
        api_key: API key (if not in environment)
        api_base: API base URL (if not default)
        timeout: Request timeout in seconds
        **kwargs: Additional provider-specific parameters

    Returns:
        ModelResponse or Iterator[StreamChunk] if streaming

    Raises:
        AuthenticationError: On authentication failure
        BadRequestError: On invalid request
        RateLimitError: On rate limit exceeded
        Timeout: On request timeout
        APIError: On other API errors
    """
    provider, model_name = parse_model_name(model)

    client = _get_client(
        provider,
        api_key=api_key,
        api_base=api_base,
        timeout=timeout,
        **kwargs,
    )

    # Create retry-wrapped function if retries are enabled
    if num_retries > 0 and not stream:  # Don't retry streaming requests

        @_create_retry_decorator(num_retries)
        def _make_request():
            return client.completion(
                model=model_name,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                stream=stream,
                tools=tools,
                tool_choice=tool_choice,
                response_format=response_format,
                **kwargs,
            )

        return _make_request()
    else:
        return client.completion(
            model=model_name,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream,
            tools=tools,
            tool_choice=tool_choice,
            response_format=response_format,
            **kwargs,
        )

acompletion(model, messages, temperature=None, max_tokens=None, stream=False, tools=None, tool_choice=None, response_format=None, num_retries=3, retry_strategy='exponential_backoff_retry', cache=None, api_key=None, api_base=None, timeout=600.0, **kwargs) async

Make an async completion request to an LLM provider.

Compatible with litellm.acompletion() API.

Returns:

Type Description
Union[ModelResponse, AsyncIterator[StreamChunk]]

ModelResponse or AsyncIterator[StreamChunk] if streaming

Source code in ullm/main.py
async def acompletion(
    model: str,
    messages: list[Dict[str, Any]],
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    stream: bool = False,
    tools: Optional[list[Tool]] = None,
    tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
    response_format: Optional[ResponseFormat] = None,
    num_retries: int = 3,
    retry_strategy: str = "exponential_backoff_retry",
    cache: Optional[Dict[str, Any]] = None,
    api_key: Optional[str] = None,
    api_base: Optional[str] = None,
    timeout: float = 600.0,
    **kwargs: Any,
) -> Union[ModelResponse, AsyncIterator[StreamChunk]]:
    """
    Make an async completion request to an LLM provider.

    Compatible with litellm.acompletion() API.

    Args:
        Same as completion()

    Returns:
        ModelResponse or AsyncIterator[StreamChunk] if streaming
    """
    provider, model_name = parse_model_name(model)

    client = _get_client(
        provider,
        api_key=api_key,
        api_base=api_base,
        timeout=timeout,
        **kwargs,
    )

    # Create retry-wrapped function if retries are enabled
    if num_retries > 0 and not stream:

        @_create_retry_decorator(num_retries)
        async def _make_request():
            return await client.acompletion(
                model=model_name,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                stream=stream,
                tools=tools,
                tool_choice=tool_choice,
                response_format=response_format,
                **kwargs,
            )

        return await _make_request()
    else:
        return await client.acompletion(
            model=model_name,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
            stream=stream,
            tools=tools,
            tool_choice=tool_choice,
            response_format=response_format,
            **kwargs,
        )

(Full API reference coming soon)