Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions examples/telephony/amd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from livekit import api, rtc
from livekit.agents import (
AMD,
NOT_GIVEN,
Agent,
AgentServer,
AgentSession,
Expand Down Expand Up @@ -73,37 +74,39 @@ async def entrypoint(ctx: JobContext):
raise RuntimeError(
"session room_io is unavailable. Make sure you use dev or start commands"
)
session.room_io.set_participant(participant_identity)
if participant_identity:
session.room_io.set_participant(participant_identity)

async with AMD(
session,
participant_identity=participant_identity,
participant_identity=participant_identity or NOT_GIVEN,
) as detector:
# start running amd before the SIP participant joins to avoid audio loss
logger.info(f"creating SIP participant for {participant_identity}")
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=outbound_trunk_id,
sip_call_to=phone_number,
participant_identity=participant_identity,
wait_until_answered=True,
if phone_number and outbound_trunk_id and participant_identity:
logger.info(f"creating SIP participant for {participant_identity}")
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=outbound_trunk_id,
sip_call_to=phone_number,
participant_identity=participant_identity,
wait_until_answered=True,
)
)
participant = await ctx.wait_for_participant(identity=participant_identity)
logger.info(
"participant joined",
extra={
"actual_identity": participant.identity,
"expected_identity": participant_identity,
"kind": participant.kind,
"audio_tracks_subscribed": [
pub.sid
for pub in participant.track_publications.values()
if pub.subscribed and pub.kind == rtc.TrackKind.KIND_AUDIO
],
},
)
)
participant = await ctx.wait_for_participant(identity=participant_identity)
logger.info(
"participant joined",
extra={
"actual_identity": participant.identity,
"expected_identity": participant_identity,
"kind": participant.kind,
"audio_tracks_subscribed": [
pub.sid
for pub in participant.track_publications.values()
if pub.subscribed and pub.kind == rtc.TrackKind.KIND_AUDIO
],
},
)

result = await detector.execute()
logger.info(f"AMD result: {result}")
Expand Down
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
from .voice.amd import (
AMD,
AMDCategory,
AMDResult,
AMDPredictionEvent,
)
from .voice.background_audio import AudioConfig, BackgroundAudioPlayer, BuiltinAudioClip, PlayHandle
from .voice.room_io import RoomInputOptions, RoomIO, RoomOutputOptions
Expand Down Expand Up @@ -232,7 +232,7 @@ def __getattr__(name: str) -> typing.Any:
"AgentHandoffEvent",
"AMD",
"AMDCategory",
"AMDResult",
"AMDPredictionEvent",
"TurnHandlingOptions",
"EndpointingOptions",
"InterruptionOptions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def to_chat_ctx(
tc["extra_content"] = extra_content
tool_calls.append(tc)
if tool_calls:
msg["tool_calls"] = tool_calls
msg["tool_calls"] = tool_calls # type: ignore[assignment]
messages.append(msg)

# append tool outputs following the tool calls
Expand Down
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/voice/amd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .classifier import AMDCategory, AMDResult
from .classifier import AMDCategory, AMDPredictionEvent
from .detector import AMD

__all__ = ["AMD", "AMDCategory", "AMDResult"]
__all__ = ["AMD", "AMDCategory", "AMDPredictionEvent"]
16 changes: 8 additions & 8 deletions livekit-agents/livekit/agents/voice/amd/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class AMDCategory(str, Enum):
UNCERTAIN = "uncertain"


class AMDResult(BaseModel):
type: Literal["amd"] = "amd"
class AMDPredictionEvent(BaseModel):
type: Literal["amd_prediction"] = "amd_prediction"
speech_duration: float
category: AMDCategory
reason: str
Expand Down Expand Up @@ -102,7 +102,7 @@ def wrapper(self: "_AMDClassifier", *args: Any, **kwargs: Any) -> Any:
return wrapper


class _AMDClassifier(EventEmitter[Literal["amd_result"]]):
class _AMDClassifier(EventEmitter[Literal["amd_prediction"]]):
def __init__(
self,
llm: LLM,
Expand All @@ -129,7 +129,7 @@ def __init__(
self._silence_timer_trigger: Literal["short_speech", "long_speech"] | None = None
self._detection_timeout_timer: asyncio.TimerHandle | None = None

self._verdict_result: AMDResult | None = None
self._verdict_result: AMDPredictionEvent | None = None
self._verdict_ready = asyncio.Event()

self._llm = llm
Expand Down Expand Up @@ -231,7 +231,7 @@ def on_user_speech_ended(self, silence_duration: float) -> None:
)
self._silence_timer_trigger = "long_speech"

def _set_verdict(self, result: AMDResult) -> None:
def _set_verdict(self, result: AMDPredictionEvent) -> None:
self._verdict_result = result
self._try_emit_result()

Expand All @@ -246,7 +246,7 @@ def _try_emit_result(self) -> None:
if self._detection_timeout_timer is not None:
self._detection_timeout_timer.cancel()
self._detection_timeout_timer = None
self.emit("amd_result", self._verdict_result)
self.emit("amd_prediction", self._verdict_result)
self._emitted = True

@log_exceptions(logger=logger)
Expand All @@ -264,7 +264,7 @@ def _silence_timer_callback(

if is_given(category) and is_given(reason) and self._verdict_result is None:
self._set_verdict(
AMDResult(
AMDPredictionEvent(
speech_duration=speech_duration or self.speech_duration,
category=category,
reason=reason,
Expand Down Expand Up @@ -325,7 +325,7 @@ async def save_prediction(label: AMDCategory) -> None:
"""Save the prediction to the verdict."""
if label != AMDCategory.UNCERTAIN:
self._set_verdict(
AMDResult(
AMDPredictionEvent(
speech_duration=self.speech_duration,
category=label,
reason="llm",
Expand Down
24 changes: 15 additions & 9 deletions livekit-agents/livekit/agents/voice/amd/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
from types import TracebackType
from typing import TYPE_CHECKING, TypedDict
from typing import TYPE_CHECKING, Literal, TypedDict

from opentelemetry import trace

Expand All @@ -15,7 +15,7 @@
from ...stt import STT as _STT
from ...telemetry import trace_types, tracer
from ...types import NOT_GIVEN, NotGivenOr
from ...utils import aio, is_given
from ...utils import EventEmitter, aio, is_given
from ...utils.participant import wait_for_track_publication
from .classifier import (
AMD_PROMPT,
Expand All @@ -25,7 +25,7 @@
NO_SPEECH_THRESHOLD,
TIMEOUT,
AMDCategory,
AMDResult,
AMDPredictionEvent,
_AMDClassifier,
)

Expand Down Expand Up @@ -75,7 +75,7 @@ class DetectionOptions(TypedDict, total=False):
}


class AMD:
class AMD(EventEmitter[Literal["amd_prediction"]]):
"""Answering Machine Detection (AMD).

Detects whether an outbound call is answered by a human or a machine.
Expand Down Expand Up @@ -135,6 +135,7 @@ def __init__(
suppress_compatibility_warning: bool = False,
detection_options: NotGivenOr[DetectionOptions] = NOT_GIVEN,
) -> None:
super().__init__()
self._llm_config: NotGivenOr[LLM | LLMModels | str] = llm
self._session: AgentSession = session
self._interrupt_on_machine = interrupt_on_machine
Expand All @@ -144,7 +145,7 @@ def __init__(
self._stt: NotGivenOr[_STT] = _InferenceSTT(stt) if isinstance(stt, str) else stt

self._classifier: _AMDClassifier | None = None
self._result: AMDResult | None = None
self._result: AMDPredictionEvent | None = None
self._closed = False
self._span: trace.Span | None = None

Expand Down Expand Up @@ -176,7 +177,7 @@ def pending(self) -> bool:
def started(self) -> bool:
return self._classifier is not None and self._classifier.started

async def execute(self) -> AMDResult:
async def execute(self) -> AMDPredictionEvent:
"""Run AMD and return the result.

While executing, speech playout authorization is locked. Once the
Expand Down Expand Up @@ -250,7 +251,7 @@ async def aclose(self) -> None:
self._stt_task = None

if self._classifier:
self._classifier.off("amd_result", self._on_amd_result)
self._classifier.off("amd_prediction", self._on_amd_prediction)
await self._classifier.close()
self._classifier = None

Expand All @@ -276,7 +277,7 @@ async def _run(self, session: AgentSession) -> None:
raise ValueError(
"AMD classifier could not be resolved, please provide a compatible model"
)
self._classifier.on("amd_result", self._on_amd_result)
self._classifier.on("amd_prediction", self._on_amd_prediction)
self._closed = False
self._result = None

Expand Down Expand Up @@ -360,7 +361,7 @@ async def _receive() -> None:
finally:
await aio.cancel_and_wait(*tasks)

def _on_amd_result(self, result: AMDResult) -> None:
def _on_amd_prediction(self, result: AMDPredictionEvent) -> None:
self._result = result
if self._classifier:
self._classifier.end_input()
Expand Down Expand Up @@ -395,6 +396,11 @@ def _on_amd_result(self, result: AMDResult) -> None:
except RuntimeError:
pass

if (host := self._session._session_host) is not None:
host._on_amd_prediction(result)

self.emit("amd_prediction", result)

def _start_span(self) -> None:
if self._span:
return
Expand Down
31 changes: 31 additions & 0 deletions livekit-agents/livekit/agents/voice/remote_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections.abc import AsyncIterator, Mapping, Sequence
from typing import TYPE_CHECKING, Any

from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp

from livekit import rtc
Expand All @@ -32,6 +33,7 @@
TTSModelUsage,
)
from ..version import __version__
from ..voice.amd import AMDCategory, AMDPredictionEvent
from .events import (
AgentState,
AgentStateChangedEvent,
Expand Down Expand Up @@ -248,6 +250,14 @@ async def __anext__(self) -> agent_pb.AgentSessionMessage:
"e2e_latency",
)

_AMD_CATEGORY_MAP: dict[AMDCategory, agent_pb.AmdCategory] = {
AMDCategory.HUMAN: agent_pb.AmdCategory.AMD_HUMAN,
AMDCategory.MACHINE_IVR: agent_pb.AmdCategory.AMD_MACHINE_IVR,
AMDCategory.MACHINE_VM: agent_pb.AmdCategory.AMD_MACHINE_VM,
AMDCategory.MACHINE_UNAVAILABLE: agent_pb.AmdCategory.AMD_MACHINE_UNAVAILABLE,
AMDCategory.UNCERTAIN: agent_pb.AmdCategory.AMD_UNCERTAIN,
}


def _tool_names(tools: Sequence[llm.Tool | Toolset]) -> list[str]:
result: list[str] = []
Expand Down Expand Up @@ -524,6 +534,27 @@ def _on_overlapping_speech(self, event: OverlappingSpeechEvent) -> None:

self._send_event(agent_pb.AgentSessionEvent(overlapping_speech=pb))

def _on_amd_prediction(self, event: AMDPredictionEvent) -> None:
speech_duration = Duration()
speech_duration.FromNanoseconds(int(event.speech_duration * 1e9))

delay = Duration()
delay.FromNanoseconds(int(event.delay * 1e9))

self._send_event(
agent_pb.AgentSessionEvent(
amd_prediction=agent_pb.AgentSessionEvent.AmdPrediction(
speech_duration=speech_duration,
delay=delay,
category=_AMD_CATEGORY_MAP[event.category],
reason=event.reason,
transcript=event.transcript,
)
)
)

# TODO: @chenghao-mou add EOT prediction event

def _on_session_usage_updated(self, event: SessionUsageUpdatedEvent) -> None:
self._send_event(
agent_pb.AgentSessionEvent(
Expand Down
4 changes: 2 additions & 2 deletions livekit-agents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [
"certifi>=2025.6.15",
"livekit==1.1.7",
"livekit-api>=1.0.7,<2",
"livekit-protocol>=1.1.6,<2",
"livekit-protocol>=1.1.8,<2",
"livekit-blingfire~=1.1,<2",
"protobuf>=3",
"pyjwt>=2.0",
Expand Down Expand Up @@ -143,4 +143,4 @@ include = ["/livekit"]

[tool.uv]
exclude-newer = "7 days"
exclude-newer-package = { livekit = "0 days", livekit-api = "0 days", livekit-protocol = "0 days", livekit-blingfire = "0 days" }
exclude-newer-package = { livekit = "0 days", livekit-api = "0 days", livekit-protocol = "0 days", livekit-blingfire = "0 days" }
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ def _transcription_to_speech_event(self, text: str) -> stt.SpeechEvent:
)

async def aclose(self) -> None:
await self._fal_client._client.aclose()
await (await self._fal_client._client).aclose()
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def _recognize_impl(
data = rtc.combine_audio_frames(buffer).to_wav_bytes()
model = "mansa_v1" if config.language == "en" else "legacy"
resp = await self._client.speech.transcribe(
language=config.language.language, # type: ignore
language=config.language.language,
content=data,
timeout=httpx.Timeout(30, connect=conn_options.timeout),
timestamp="word" if "mansa" in model else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
async def _run(self, output_emitter: tts.AudioEmitter) -> None:
spitch_stream = self._client.speech.with_streaming_response.generate(
text=self.input_text,
language=self._opts.language.language, # type: ignore
language=self._opts.language.language,
voice=self._opts.voice, # type: ignore
format="mp3",
timeout=httpx.Timeout(30, connect=self._conn_options.timeout),
Expand Down
Loading
Loading