Welcome to Joblin!

pyright-workflow pytest-workflow docs-workflow

Joblin is a simple, SQLite-based, synchronous, Python job queue library.

import time
from joblin import Queue

with Queue.connect("job.db") as queue:
   data = '{"type": "my-type", "message": "Hello world!"}'
   queue.add_job_from_now(data, starts_after=3.0, expires_after=10.0)

   while (job := queue.get_next_job()) is not None:
      time.sleep(job.delay)
      print(f"Received job {job.id} with data: {job.data}")
      job.complete()

Usage

With Python 3.11+ and Git, this library can be installed using:

pip install git+https://github.com/thegamecracks/joblin@v0.3.0.post2

Afterwards you can import joblin and use the Queue class to start storing jobs.

Examples

Check out the examples/ for reference on using the queue:

_images/tkinter_app.png

License

This project is written under the MIT license.

API Reference

class joblin.Queue

Bases: object

A queue that persists jobs in an SQLite database. Not thread-safe.

queue = Queue.connect("job.db")
job = queue.add_job("Some payload")
job = queue.get_next_job()
if job is not None:
    job.complete()
queue.close()

This class does not directly provide a mechanism for executing jobs, but rather expects the caller to retrieve the next job and wait until the job can start. As such, it is also the caller’s responsibility to reschedule whenever a new job is added, potentially from other processes.

The queue can be used in a context manager to automatically close the database upon exiting. For example:

with Queue.connect("job.db") as queue:
    ...

Public Data Attributes:

conn

The SQLite conection used to query for jobs.

time_func

The function used to get the current time.

Public Methods:

__init__(conn, *, time_func)

param conn:

The SQLite conection used to query for jobs.

connect(path[, time_func])

Connect and set up an SQLite database at the given path.

__enter__()

__exit__(exc_type, exc_val, tb)

add_job(data, *[, created_at, starts_at, ...])

Add a job to the queue.

add_job_from_now(data, *[, starts_after, ...])

A convenience method to add a job relative to the current time.

get_job_by_id(job_id)

Get a job from the queue by ID.

get_next_job([now])

Get the next job in the queue.

get_next_job_delay([now])

Get the next job's ID and the amount of time in seconds to wait until it starts.

count_pending_jobs([now])

Count the number of jobs that need to run.

lock_job(job_id, *[, locked_at])

Attempt to lock the given job.

lock_next_job([now])

Get and lock the next job in the queue.

lock_next_job_delay([now])

Lock the next job and return its ID and the amount of time in seconds to wait until it starts.

unlock_job(job_id)

Attempt to unlock the given job.

complete_job(job_id[, completed_at])

Mark the given job as completed.

delete_job(job_id)

Delete a job from the queue by ID.

delete_completed_jobs()

Delete all completed jobs.

delete_expired_jobs([now])

Delete all expired jobs.

close()

Close the queue.

time()

Get the current time as returned by time_func.


__init__(conn: Connection, *, time_func: Callable[[], float]) None
Parameters:
  • conn – The SQLite conection used to query for jobs.

  • time_func – The function used to get the current time.

conn: Connection

The SQLite conection used to query for jobs.

time_func: Callable[[], float]

The function used to get the current time.

classmethod connect(path: str, time_func: ~typing.Callable[[], float] = <built-in function time>, **kwargs) Self

Connect and set up an SQLite database at the given path.

with Queue.connect("job.db") as queue:
    ...

:memory: can be used instead of a file path to create an in-memory queue. In this case, it may be desirable to provide a monotonic time function to avoid abnormal behaviour from the system time changing.

Extra arguments will be passed to sqlite3.connect().

Parameters:
  • path – The database path to open.

  • time_func – The function used to get the current time.

Returns:

A new queue instance.

add_job(data: Any, *, created_at: float | None = None, starts_at: float | None = None, expires_at: float | None = None) Job

Add a job to the queue.

job = queue.add_job("Hello world!", created_at=123.45, starts_at=130, expires_at=140)
Parameters:
  • data – The payload to be stored with the job.

  • created_at – The time at which the job was created. Defaults to the current time.

  • starts_at – The time at which the job should be executed. This cannot be lower than the creation time. Defaults to the job’s creation time.

  • expires_at – The time at which the job will expire. This cannot be lower than the start time. If None, the job will never expire.

Returns:

The job that was added.

Raises:

sqlite3.IntegrityError – The start or expiration time was invalid.

add_job_from_now(data: Any, *, starts_after: float = 0.0, expires_after: float | None = None, created_at: float | None = None) Job

A convenience method to add a job relative to the current time.

job = queue.add_job_from_now("Hello world!", starts_after=10, expires_after=20)
Parameters:
  • data – The payload to be stored with the job.

  • starts_after – The amount of time in seconds after the creation time. This cannot be a negative value.

  • expires_at – The amount of time in seconds after which the job will expire. This cannot be a negative value. If None, the job will never expire.

  • created_at – The time at which the job was created. Defaults to the current time.

Returns:

The job that was added.

Raises:

sqlite3.IntegrityError – The start or expiration time was invalid.

get_job_by_id(job_id: int) Job | None

Get a job from the queue by ID.

job = queue.get_job_by_id(1234)
Parameters:

job_id – The ID of the job.

Returns:

A job object, or None if not found.

get_next_job(now: float | None = None) Job | None

Get the next job in the queue.

job = queue.get_next_job()

If two jobs start at the same time, the job with the lower ID gets priority.

Parameters:

now – The current time. Defaults to the current time.

Returns:

The next job to be completed, if any.

get_next_job_delay(now: float | None = None) tuple[int, float] | None

Get the next job’s ID and the amount of time in seconds to wait until it starts.

job_delay = queue.get_next_job_delay()
if job_delay is not None:
    job_id, delay = job_delay

    time.sleep(delay)

    job = queue.get_job_by_id(job_id)
    if job is not None:
        ...

This reduces unnecessary I/O compared to get_next_job() when only the time is needed.

If the job’s start time is overdue, the delay will be 0.

To avoid race conditions in cases where the job’s start and expiration time are equal, the job object should be retrieved with get_job_by_id() rather than calling get_next_job().

Parameters:

now – The current time. Defaults to the current time.

Returns:

The next job’s ID and delay, or None if no job is pending.

Added in version 0.3.0.

count_pending_jobs(now: float | None = None) int

Count the number of jobs that need to run.

pending = queue.count_pending_jobs()
Parameters:

now – The current time. Defaults to the current time.

Returns:

The number of pending jobs.

lock_job(job_id: int, *, locked_at: float | None = None) bool

Attempt to lock the given job.

success = queue.lock_job(1234)

This prevents the job from showing up in subsequent get_next_job() calls.

If the job is already locked or does not exist, this returns False.

Parameters:
  • job_id – The ID of the job.

  • locked_at – The time at which the job was locked. Defaults to the current time.

Returns:

True if the job was locked, False otherwise.

Added in version 0.3.0.

lock_next_job(now: float | None = None) Job | None

Get and lock the next job in the queue.

job = queue.lock_next_job()
if job is not None:
    time.sleep(job.delay)

    # In case job got updated or deleted, check again
    job = queue.get_job_by_id(job.id)
    if job is not None:
        job.complete()
        job.unlock()

This should be preferred over manually calling get_next_job() and lock_job() as this method will do both in a single transaction, reducing the chance of other connections trying to lock the same job.

If two jobs start at the same time, the job with the lower ID gets priority.

Parameters:

now – The current time. Defaults to the current time.

Returns:

The next job to be completed, if any.

Added in version 0.3.0.

lock_next_job_delay(now: float | None = None) tuple[int, float] | None

Lock the next job and return its ID and the amount of time in seconds to wait until it starts.

job_delay = queue.lock_next_job_delay()
if job_delay is not None:
    job_id, delay = job_delay

    time.sleep(delay)

    job = queue.get_job_by_id(job_id)
    if job is not None:
        job.complete()
        job.unlock()

This should be preferred over manually calling get_next_job() and lock_job() as this method will do both in a single transaction, reducing the chance of other connections trying to lock the same job.

This reduces unnecessary I/O compared to lock_next_job() when only the time is needed.

If the job’s start time is overdue, the delay will be 0.

To avoid race conditions in cases where the job’s start and expiration time are equal, the job object should be retrieved with get_job_by_id() rather than calling get_next_job().

Parameters:

now – The current time. Defaults to the current time.

Returns:

The next job’s ID and delay, or None if no job is pending.

Added in version 0.3.0.

unlock_job(job_id: int) bool

Attempt to unlock the given job.

success = queue.unlock_job(1234)

Unlike lock_job(), this method returns True if job is already unlocked.

If the job does not exist, this returns False.

Parameters:

job_id – The ID of the job.

Returns:

True if the job was unlocked, False otherwise.

Added in version 0.3.0.

complete_job(job_id: int, completed_at: float | None = None) bool

Mark the given job as completed.

success = queue.complete_job(1234)

If the job does not exist, this is a no-op.

Parameters:
  • job_id – The ID of the job.

  • completed_at – The time at which the job was completed. Defaults to the current time.

Returns:

True if the job was updated, False otherwise.

delete_job(job_id: int) bool

Delete a job from the queue by ID.

success = queue.delete_job(1234)
Parameters:

job_id – The ID of the job.

Returns:

True if the job existed, False otherwise.

delete_completed_jobs() int

Delete all completed jobs.

deleted = queue.delete_completed_jobs()
Parameters:

now – The current time. Defaults to the current time.

Returns:

The number of jobs that were deleted.

delete_expired_jobs(now: float | None = None) int

Delete all expired jobs.

deleted = queue.delete_expired_jobs()

Jobs marked as completed will not be considered as expired.

Parameters:

now – The current time. Defaults to the current time.

Returns:

The number of jobs that were deleted.

close() None

Close the queue.

Added in version 0.2.0.

time() float

Get the current time as returned by time_func.

class joblin.Job

Bases: object

A job created by the queue.

Any methods here that call the queue are not thread-safe. If querying from another thread is desired, you must create a new connection and queue.

queue: Queue

The queue associated with this job.

id: int

The job’s ID stored in the queue.

data: Any

The payload stored with this job.

created_at: float

The time at which this job was created.

starts_at: float

The time at which this job should be executed.

expires_at: float | None

The time at which this job will expire.

If None, the job will never expire.

__init__(queue: Queue, id: int, data: Any, created_at: float, starts_at: float, expires_at: float | None, completed_at: float | None, locked_at: float | None) None
completed_at: float | None

The time at which this job was completed, or None if not completed.

locked_at: float | None

The time at which this job was locked, or None if not locked.

Added in version 0.3.0.

complete(completed_at: float | None = None) bool

Mark the job as completed.

This is a convenience method for calling Queue.complete_job().

If the job does not exist, this is a no-op.

Note that calling this method does not change the completed_at attribute.

Parameters:

completed_at – The time at which the job was completed. Defaults to the queue’s current time.

Returns:

True if the job was updated, False otherwise.

Changed in version 0.1.1: completed_at now defaults to None as originally implied, rather than being a required parameter.

delete() bool

Delete the job from the queue.

This is a convenience method for calling Queue.delete_job().

Returns:

True if the job existed, False otherwise.

lock(locked_at: float | None = None) bool

Attempt to lock this job.

This is a convenience method for calling Queue.lock_job().

This prevents the job from showing up in subsequent Queue.get_next_job() calls.

If the job is already locked or does not exist, this returns False.

Parameters:

locked_at – The time at which the job was locked. Defaults to the current time.

Returns:

True if the job was locked, False otherwise.

Added in version 0.3.0.

unlock() bool

Attempt to unlock this job.

This is a convenience method for calling Queue.unlock_job().

Unlike lock(), this method returns True if job is already unlocked.

If the job does not exist, this returns False.

Parameters:

job_id – The ID of the job.

Returns:

True if the job was unlocked, False otherwise.

Added in version 0.3.0.

property delay: float

The amount of time in seconds until the job starts.

If the job’s start time is overdue, this returns 0.

Added in version 0.3.0.