← GDG /

#264 gdtest_skill_complex

#264 gdtest_skill_complex OK CONFIG
Tests skill with subdirectories: references/, scripts/, assets/
Sophisticated skill composition: curated SKILL.md accompanied by the full Agent Skills directory structure โ€” references/ (API cheatsheet, migration guide), scripts/ (setup helper, test runner), and assets/ (config template). The SKILL.md body cross-references companion files with directory tree diagrams, tables, and embedded code samples. Tests that the raw rendering handles complex multi-file skill structures.
View Site → Build Log ๐Ÿงช Test Coverage

Build Mode

● Has great-docs.yml

This package ships a pre-supplied config. The great-docs init step is skipped and great-docs build uses the spec-defined configuration directly. Tests specific config options and their rendered output.

Dimensions

S7
S7Skill with subdirs (refs/scripts/assets)skill

Source Files

๐Ÿ“ gdtest_skill_complex/
๐Ÿ“„ __init__.py
"""A task-orchestration framework."""

__version__ = "3.1.0"
__all__ = [
    "Scheduler",
    "Task",
    "CronExpr",
    "TaskResult",
    "every",
    "cron",
    "once",
    "chain",
]


class Scheduler:
    """
    Central task scheduler with worker pool.

    Parameters
    ----------
    workers
        Number of concurrent worker threads.
    name
        Scheduler instance identifier.
    """

    def __init__(self, workers: int = 1, name: str = "default"):
        self.workers = workers
        self.name = name
        self._tasks: list = []

    def every(self, seconds: int, fn=None, name: str = "") -> "Task":
        """
        Schedule a recurring task at a fixed interval.

        Parameters
        ----------
        seconds
            Interval in seconds between executions.
        fn
            Callable to execute.
        name
            Task identifier.

        Returns
        -------
        Task
            The registered task.
        """
        t = Task(name=name or "interval", fn=fn)
        self._tasks.append(t)
        return t

    def cron(self, expr: str, fn=None, name: str = "") -> "Task":
        """
        Schedule a task using a cron expression.

        Parameters
        ----------
        expr
            Cron expression (e.g., ``*/5 * * * *``).
        fn
            Callable to execute.
        name
            Task identifier.

        Returns
        -------
        Task
            The registered task.
        """
        CronExpr(expr)  # validate
        t = Task(name=name or "cron", fn=fn)
        self._tasks.append(t)
        return t

    def once(self, delay: int, fn=None, name: str = "") -> "Task":
        """
        Schedule a one-shot task after a delay.

        Parameters
        ----------
        delay
            Delay in seconds before execution.
        fn
            Callable to execute.
        name
            Task identifier.

        Returns
        -------
        Task
            The registered task.
        """
        t = Task(name=name or "once", fn=fn)
        self._tasks.append(t)
        return t

    def chain(self, *tasks: "Task") -> list["TaskResult"]:
        """
        Execute tasks sequentially, passing each result to the next.

        Parameters
        ----------
        *tasks
            Tasks to chain in order.

        Returns
        -------
        list[TaskResult]
            Results from each task in the chain.
        """
        return [TaskResult(task=t, status="pending") for t in tasks]

    def submit(self, fn=None, name: str = "") -> "Task":
        """
        Submit a fire-and-forget task.

        Parameters
        ----------
        fn
            Callable to execute.
        name
            Task identifier.

        Returns
        -------
        Task
            The submitted task.
        """
        t = Task(name=name or "submitted", fn=fn)
        self._tasks.append(t)
        return t

    def run(self) -> list["TaskResult"]:
        """
        Start the scheduler and block until stopped.

        Returns
        -------
        list[TaskResult]
            Results of all completed tasks.
        """
        return []

    def stop(self) -> None:
        """Stop the scheduler gracefully."""
        pass


class Task:
    """
    A unit of work managed by a Scheduler.

    Parameters
    ----------
    name
        Task identifier (must be unique within a scheduler).
    fn
        Callable to execute.
    timeout
        Maximum execution time in seconds (None = no limit).
    """

    def __init__(self, name: str, fn=None, timeout: int | None = None):
        self.name = name
        self.fn = fn
        self.timeout = timeout


class CronExpr:
    """
    A parsed cron expression.

    Parameters
    ----------
    expr
        Cron expression string (5 fields: min hour dom month dow).

    Raises
    ------
    ValueError
        If the expression is malformed.
    """

    def __init__(self, expr: str):
        parts = expr.strip().split()
        if len(parts) != 5:
            raise ValueError(
                f"Cron expression must have 5 fields, got {len(parts)}: {expr!r}"
            )
        self.expr = expr

    def matches(self, dt) -> bool:
        """
        Check if a datetime matches this cron expression.

        Parameters
        ----------
        dt
            A datetime to test.

        Returns
        -------
        bool
            True if the datetime matches.
        """
        return True


class TaskResult:
    """
    The result of a completed task.

    Parameters
    ----------
    task
        The task that produced this result.
    status
        Execution status (pending, success, failed, timeout).
    value
        Return value from the task callable.
    error
        Exception if the task failed.
    """

    def __init__(
        self,
        task: Task,
        status: str = "pending",
        value=None,
        error: Exception | None = None,
    ):
        self.task = task
        self.status = status
        self.value = value
        self.error = error


def every(seconds: int, fn=None) -> Task:
    """
    Module-level shortcut: schedule a recurring task.

    Parameters
    ----------
    seconds
        Interval between executions.
    fn
        Callable to execute.

    Returns
    -------
    Task
        The registered task.
    """
    return Scheduler().every(seconds, fn)


def cron(expr: str, fn=None) -> Task:
    """
    Module-level shortcut: schedule a cron task.

    Parameters
    ----------
    expr
        Cron expression.
    fn
        Callable to execute.

    Returns
    -------
    Task
        The registered task.
    """
    return Scheduler().cron(expr, fn)


def once(delay: int, fn=None) -> Task:
    """
    Module-level shortcut: schedule a one-shot task.

    Parameters
    ----------
    delay
        Seconds before execution.
    fn
        Callable to execute.

    Returns
    -------
    Task
        The registered task.
    """
    return Scheduler().once(delay, fn)


def chain(*tasks: Task) -> list[TaskResult]:
    """
    Module-level shortcut: chain tasks sequentially.

    Parameters
    ----------
    *tasks
        Tasks to chain.

    Returns
    -------
    list[TaskResult]
        Results from each task.
    """
    return Scheduler().chain(*tasks)
๐Ÿ“ skills/
๐Ÿ“ gdtest-skill-complex/
๐Ÿ“ assets/
๐Ÿ“„ config-template.yaml
# gdtest-skill-complex starter configuration
# Copy this to your project root as scheduler-config.yaml

scheduler:
  workers: 4
  name: production

tasks:
  - name: healthcheck
    type: every
    seconds: 30
    fn: app.health.check

  - name: nightly-backup
    type: cron
    expr: "0 2 * * *"
    fn: app.backup.run
    timeout: 3600

  - name: weekly-report
    type: cron
    expr: "0 9 * * 1"
    fn: app.reports.weekly

  - name: data-cleanup
    type: cron
    expr: "0 3 * * 0"
    fn: app.maintenance.cleanup
    timeout: 7200
๐Ÿ“ references/
๐Ÿ“„ api-cheatsheet.md
# API Cheatsheet โ€” gdtest-skill-complex

## Classes

| Class | Constructor | Key Methods |
|-------|------------|-------------|
| `Scheduler` | `(workers=1, name='default')` | `.every()`, `.cron()`, `.once()`, `.submit()`, `.chain()`, `.run()`, `.stop()` |
| `Task` | `(name, fn=None, timeout=None)` | โ€” |
| `CronExpr` | `(expr)` | `.matches(dt)` |
| `TaskResult` | `(task, status, value, error)` | โ€” |

## Module-level functions

| Function | Signature | Equivalent to |
|----------|-----------|---------------|
| `every()` | `(seconds, fn)` | `Scheduler().every(...)` |
| `cron()` | `(expr, fn)` | `Scheduler().cron(...)` |
| `once()` | `(delay, fn)` | `Scheduler().once(...)` |
| `chain()` | `(*tasks)` | `Scheduler().chain(...)` |

## Cron expression format

```
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ minute (0โ€“59)
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ hour (0โ€“23)
โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ day of month (1โ€“31)
โ”‚ โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€ month (1โ€“12)
โ”‚ โ”‚ โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€ day of week (0โ€“6, Sun=0)
* * * * *
```

## Common patterns

```python
# Every 30 seconds
scheduler.every(seconds=30, fn=poll)

# Every 5 minutes (cron)
scheduler.cron("*/5 * * * *", fn=check)

# Weekdays at 9 AM
scheduler.cron("0 9 * * 1-5", fn=report)

# One-shot in 2 minutes
scheduler.once(delay=120, fn=migrate)

# Sequential pipeline
scheduler.chain(task_a, task_b, task_c)
```
๐Ÿ“„ migration-v2-v3.md
# Migration Guide: v2 โ†’ v3

## Breaking changes

### 1. `Scheduler.interval()` โ†’ `Scheduler.every()`

```python
# v2 (removed)
scheduler.interval(30, fn=poll)

# v3
scheduler.every(seconds=30, fn=poll)
```

### 2. `Task.callback` โ†’ `Task.fn`

```python
# v2
task = Task("name", callback=my_func)

# v3
task = Task("name", fn=my_func)
```

### 3. CronExpr validates eagerly

```python
# v2: no error until .matches() called
expr = CronExpr("bad")
expr.matches(now)  # ValueError here

# v3: error on construction
expr = CronExpr("bad")  # ValueError here
```

### 4. Worker default changed

| Setting | v2 | v3 |
|---------|----|----|
| `Scheduler(workers=...)` | `os.cpu_count()` | `1` |

Set `workers` explicitly to preserve v2 behavior:

```python
import os
scheduler = Scheduler(workers=os.cpu_count())
```

## Non-breaking additions

- `scheduler.submit()` โ€” fire-and-forget tasks
- `scheduler.chain()` โ€” sequential task pipelines
- `TaskResult.error` โ€” exception capture on failure
- `Task.timeout` โ€” per-task execution time limit
๐Ÿ“ scripts/
๐Ÿ“„ run-tests.sh
#!/usr/bin/env bash
# Run test suite with coverage for gdtest-skill-complex
set -euo pipefail

echo "Running tests with coverage..."
pytest tests/ \
    --cov=gdtest_skill_complex \
    --cov-report=term-missing \
    --cov-fail-under=80

echo "Tests complete."
๐Ÿ“„ setup-env.sh
#!/usr/bin/env bash
# Setup development environment for gdtest-skill-complex
set -euo pipefail

echo "Creating virtual environment..."
python -m venv .venv
source .venv/bin/activate

echo "Installing package with dev extras..."
pip install -e ".[dev,test]"

echo "Running initial validation..."
python -c "from gdtest_skill_complex import Scheduler; print('Import OK')"

echo "Environment ready."
๐Ÿ“„ SKILL.md
---
name: gdtest-skill-complex
description: >
  Orchestrate recurring, cron-scheduled, and one-shot tasks with
  gdtest-skill-complex. Supports worker pools, task chaining,
  cron expressions, timeouts, and fire-and-forget patterns.
license: MIT
compatibility: Requires Python >=3.10.
metadata:
  author: gdg-test-suite
  version: "3.1"
  tags:
    - task-scheduling
    - cron
    - orchestration
    - worker-pool
---

# gdtest-skill-complex

A task-orchestration framework for scheduling, chaining, and
monitoring background work.

## Quick start

```python
from gdtest_skill_complex import Scheduler

sched = Scheduler(workers=4)
sched.every(seconds=60, fn=check_health, name="healthcheck")
sched.cron("0 2 * * *", fn=nightly_backup, name="backup")
sched.run()
```

## Skill directory structure

This skill ships with companion files for agent consumption:

```
skills/gdtest-skill-complex/
โ”œโ”€โ”€ SKILL.md              โ† This file
โ”œโ”€โ”€ references/
โ”‚   โ”œโ”€โ”€ api-cheatsheet.md โ† Quick-reference for all public APIs
โ”‚   โ””โ”€โ”€ migration-v2-v3.md โ† Migration guide from v2 to v3
โ”œโ”€โ”€ scripts/
โ”‚   โ”œโ”€โ”€ setup-env.sh      โ† Environment bootstrap script
โ”‚   โ””โ”€โ”€ run-tests.sh      โ† Test runner with coverage
โ””โ”€โ”€ assets/
    โ””โ”€โ”€ config-template.yaml โ† Starter configuration file
```

## When to use what

| Need | Use |
|------|-----|
| Fixed-interval polling | `scheduler.every(seconds=30, fn=poll)` |
| Cron-scheduled jobs | `scheduler.cron('*/5 * * * *', fn=job)` |
| One-shot delayed task | `scheduler.once(delay=120, fn=migrate)` |
| Sequential pipeline | `scheduler.chain(extract, transform, load)` |
| Fire-and-forget | `scheduler.submit(fn=send_email)` |
| Concurrent workers | `Scheduler(workers=cpu_count())` |

## Core concepts

### Scheduler

The `Scheduler` manages a pool of workers and a task registry.
Tasks are added via `.every()`, `.cron()`, `.once()`, or `.submit()`.
Call `.run()` to start blocking execution, or `.stop()` to shut down
gracefully.

### Task

A `Task` wraps a callable with a unique name and optional timeout.
Task names **must be unique** within a scheduler โ€” duplicates raise
`DuplicateTaskError`.

### CronExpr

Parses and evaluates standard 5-field cron expressions
(minute, hour, day-of-month, month, day-of-week). Does **not**
support second-level granularity.

```python
from gdtest_skill_complex import CronExpr

expr = CronExpr("*/5 * * * *")   # every 5 minutes
expr.matches(datetime.now())       # True/False
```

### TaskResult

Returned by `.run()` and `.chain()`. Contains the execution status
(`pending`, `success`, `failed`, `timeout`), the return value, and
any exception.

## Task chaining

Chain tasks to build sequential pipelines where each task's output
feeds the next:

```python
extract = Task("extract", fn=extract_data)
transform = Task("transform", fn=clean_and_normalize)
load = Task("load", fn=write_to_db)

results = scheduler.chain(extract, transform, load)
for r in results:
    print(f"{r.task.name}: {r.status}")
```

## Reference files

### API cheatsheet (`references/api-cheatsheet.md`)

A condensed reference of every public class and function:

| Symbol | Signature | Purpose |
|--------|-----------|---------|
| `Scheduler` | `(workers=1, name='default')` | Central orchestrator |
| `Task` | `(name, fn=None, timeout=None)` | Unit of work |
| `CronExpr` | `(expr)` | Cron parser |
| `TaskResult` | `(task, status, value, error)` | Execution result |
| `every()` | `(seconds, fn)` | Module-level interval shortcut |
| `cron()` | `(expr, fn)` | Module-level cron shortcut |
| `once()` | `(delay, fn)` | Module-level one-shot shortcut |
| `chain()` | `(*tasks)` | Module-level chain shortcut |

### Migration guide (`references/migration-v2-v3.md`)

Key changes from v2 to v3:

1. `Scheduler.interval()` renamed to `Scheduler.every()`.
2. `Task.callback` renamed to `Task.fn`.
3. `CronExpr` now validates on construction (was lazy).
4. Worker count defaults to 1 (was `os.cpu_count()`).

## Scripts

### `scripts/setup-env.sh`

Bootstrap a development environment:

```bash
#!/usr/bin/env bash
set -euo pipefail
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev,test]"
echo "Environment ready."
```

### `scripts/run-tests.sh`

Run the test suite with coverage:

```bash
#!/usr/bin/env bash
set -euo pipefail
pytest tests/ --cov=gdtest_skill_complex --cov-report=term-missing
```

## Configuration template

The `assets/config-template.yaml` provides a starter configuration:

```yaml
# gdtest-skill-complex configuration
scheduler:
  workers: 4
  name: production

tasks:
  - name: healthcheck
    type: every
    seconds: 30
    fn: app.health.check

  - name: nightly-backup
    type: cron
    expr: "0 2 * * *"
    fn: app.backup.run
    timeout: 3600

  - name: weekly-report
    type: cron
    expr: "0 9 * * 1"
    fn: app.reports.weekly
```

## Error handling

```python
results = scheduler.run()
for r in results:
    if r.status == "failed":
        print(f"Task {r.task.name} failed: {r.error}")
    elif r.status == "timeout":
        print(f"Task {r.task.name} timed out after {r.task.timeout}s")
```

## Capabilities and boundaries

**What agents can configure:**

- Create schedulers with worker pools
- Register interval, cron, one-shot, and fire-and-forget tasks
- Chain tasks into sequential pipelines
- Set per-task timeouts
- Parse and validate cron expressions
- Use reference files for quick API lookups
- Run setup and test scripts

**Requires human setup:**

- Deploying as a system service (systemd, Docker, etc.)
- Configuring monitoring and alerting
- Setting up log aggregation
- Database and credential provisioning

## Resources

- [llms.txt](llms.txt) โ€” Indexed API reference for LLMs
- [llms-full.txt](llms-full.txt) โ€” Comprehensive documentation for LLMs
๐Ÿ“„ README.md
# gdtest-skill-complex

A task-orchestration framework with comprehensive agent skill
documentation including reference cheatsheets, setup scripts,
and configuration templates.

## Installation

```bash
pip install gdtest-skill-complex
```
๐Ÿ“„ great-docs.yml
site_url: "https://example.com/gdtest-skill-complex"
skill:
  gotchas:
    - "`Scheduler.tick()` is not re-entrant โ€” never call it from inside a task callback."
    - "Task names must be unique within a `Scheduler` instance; duplicates raise `DuplicateTaskError`."
    - "The `CronExpr` parser does not support second-level granularity โ€” minimum resolution is one minute."
  best_practices:
    - "Use `Scheduler(workers=N)` to match your CPU core count for CPU-bound tasks."
    - "Always set `task.timeout` to avoid runaway tasks โ€” the default is no timeout."
    - "Prefer `Scheduler.submit()` over `Scheduler.run()` for fire-and-forget patterns."
  decision_table:
    - if: Running tasks on a fixed interval
      then: "scheduler.every(seconds=30, fn=task)"
    - if: Running tasks on a cron schedule
      then: "scheduler.cron('*/5 * * * *', fn=task)"
    - if: Running a one-shot delayed task
      then: "scheduler.once(delay=60, fn=task)"
    - if: Running tasks concurrently
      then: scheduler = Scheduler(workers=4)
    - if: Need task dependency chains
      then: "scheduler.chain(task_a, task_b, task_c)"