Tasks

Tasks are a great way to schedule coroutines to be run outside your event/command handler.
Tasks are managed by TSBot and automatically cleaned up on done/closing.

async def example_task(bot: TSBot):
    print("Example task called")
    # Do something with the bot


bot.register_task(example_task)

You can pass arguments to your task by providing them to the bot.register_task() method after the handler.

async def example_task(bot: TSBot, arg1: int, arg2: str):
    print("Example task called")
    # Do something with the bot


bot.register_task(example_task, 1, "test")

Warning

Task handlers should be well behaved. Tasks can be cancelled at any point. This mainly happens when the bot is closing and cleaning up running tasks. You should never block task cancelling by catching asyncio.CancelledError and ignoring it. This will hang your bot on close. If you need to do clean up in your task, catch the asyncio.CancelledError, do clean up, and return or re raise the exception.

Warning

Task handling is started before the bot is connected to the server. If you need the bot to be connected to the TeamSpeak server, Use build-in connect event event handlers to register tasks.

Every task

Every tasks are a way for you to schedule a task to run periodically. For example you could implement a AFK mover:

from __future__ import annotations

import asyncio
from contextlib import suppress

from tsbot import TSBot, TSTask, plugin, query
from tsbot.exceptions import TSResponseError

AFK_CHANNEL_ID = "2"  # Channel ID of the AFK channel
MAX_IDLE_TIME = 30 * 60  # Clients idle for more than 30 minutes are moved to AFK channel
CHECK_PERIOD = 60  # Check every minute for AFK clients


class AFKPlugin(plugin.TSPlugin):
    CLIENT_LIST_QUERY = query("clientlist").option("times")

    _task: TSTask

    def __init__(self, check_period: int, afk_channel_id: str, max_idle_time: int) -> None:
        self.check_period = check_period
        self.afk_channel_id = afk_channel_id
        self.max_idle_time = max_idle_time

        self.move_query = query("clientmove").params(cid=self.afk_channel_id)

    def is_not_query(self, client: dict[str, str]) -> bool:
        return client["client_type"] != "1"

    def is_not_active(self, client: dict[str, str]) -> bool:
        return int(client["client_idle_time"]) > self.max_idle_time * 1000

    def not_in_afk_channel(self, client: dict[str, str]) -> bool:
        return client["cid"] != self.afk_channel_id

    def should_be_moved(self, client: dict[str, str]) -> bool:
        return all(
            check(client)
            for check in (self.is_not_query, self.not_in_afk_channel, self.is_not_active)
        )

    async def check_afk_clients(self, bot: TSBot):
        clients = await bot.send(self.CLIENT_LIST_QUERY)

        clients_to_be_moved = set(c["clid"] for c in clients if self.should_be_moved(c))
        if not clients_to_be_moved:
            return

        move_query = self.move_query.param_block({"clid": id} for id in clients_to_be_moved)

        with suppress(TSResponseError):
            await bot.send(move_query)

    @plugin.on("connect")
    async def start_task(self, bot: TSBot, ctx: None):
        """Start the checking task on connect."""
        self._task = bot.register_every_task(self.check_period, self.check_afk_clients)

    @plugin.on("disconnect")
    async def cancel_task(self, bot: TSBot, ctx: None):
        """Cleanup task on disconnect."""
        self._task.cancel()


bot = TSBot(
    username="USERNAME",
    password="PASSWORD",
    address="ADDRESS",
)

bot.load_plugin(
    AFKPlugin(
        check_period=CHECK_PERIOD,
        afk_channel_id=AFK_CHANNEL_ID,
        max_idle_time=MAX_IDLE_TIME,
    ),
)

asyncio.run(bot.run())