golazy.dev
–
golazy.dev
/
lazyjobs
Index
|
Files
|
Directories
package lazyjobs ¶
import "golazy.dev/lazyjobs"
Constants ¶
const ControlJobsPath ¶
const ControlJobsPath = "/jobs"
const DefaultQueue ¶
const DefaultQueue = "default"
Variables ¶
var ErrNoWork ¶
var ErrNoWork = errors.New("lazyjobs: no work")
Functions ¶
func RegisterControlPlaneHandlers ¶
func RegisterControlPlaneHandlers(controlPlane *lazycontrolplane.ControlPlane, runner *JobRunner)
func RegisterLazyDevHandlers ¶
func RegisterLazyDevHandlers(controlPlane *lazycontrolplane.ControlPlane, runner *JobRunner)
func WithRunner ¶
func WithRunner(ctx context.Context, runner *JobRunner) context.Context
Types ¶
type Backend ¶
type Backend interface {
Insert(context.Context, InsertParams) (Record, error)
Claim(context.Context, ClaimParams) (Record, bool, error)
Complete(context.Context, int64) error
Retry(context.Context, RetryParams) error
Discard(context.Context, DiscardParams) error
List(context.Context, ListOptions) ([]Record, error)
Stats(context.Context) (Stats, error)
}
type BaseJob ¶
type BaseJob struct{}
func (BaseJob) JobMaxAttempts ¶
func (BaseJob) JobMaxAttempts() int
func (BaseJob) JobQueue ¶
func (BaseJob) JobQueue() string
func (BaseJob) JobRetryDelay ¶
func (BaseJob) JobRetryDelay(attempt int, _ error) time.Duration
type ClaimParams ¶
type ClaimParams struct {
Queues []string
Now time.Time
}
type Config ¶
type Config struct {
Backend Backend
Define func(*JobRunner)
Workers int
PollInterval time.Duration
Queues []string
}
type Definition ¶
type Definition struct {
Kind string `json:"kind"`
Type string `json:"type"`
Queue string `json:"queue"`
MaxAttempts int `json:"max_attempts"`
}
type DiscardParams ¶
type DiscardParams struct {
ID int64
LastError string
}
type InsertParams ¶
type InsertParams struct {
Kind string
Queue string
Payload json.RawMessage
MaxAttempts int
RunAt time.Time
}
type Job ¶
type Job interface {
Kind() string
Work(context.Context) error
}
type JobRunner ¶
type JobRunner struct {
// contains filtered or unexported fields
}
func New ¶
func New(config Config) (*JobRunner, error)
func RunnerFromContext ¶
func RunnerFromContext(ctx context.Context) (*JobRunner, bool)
func (r *JobRunner) Definitions ¶
func (r *JobRunner) Definitions() []Definition
func (r *JobRunner) Enqueue ¶
func (r *JobRunner) Enqueue(ctx context.Context, job Job) (Record, error)
func (r *JobRunner) EnqueueAt ¶
func (r *JobRunner) EnqueueAt(ctx context.Context, job Job, runAt time.Time) (Record, error)
func (r *JobRunner) EnqueueIn ¶
func (r *JobRunner) EnqueueIn(ctx context.Context, job Job, delay time.Duration) (Record, error)
func (r *JobRunner) MustRegister ¶
func (r *JobRunner) MustRegister(prototype Job)
func (r *JobRunner) Register ¶
func (r *JobRunner) Register(prototype Job) error
func (r *JobRunner) Running ¶
func (r *JobRunner) Running() bool
func (r *JobRunner) Snapshot ¶
func (r *JobRunner) Snapshot(ctx context.Context) (Snapshot, error)
func (r *JobRunner) Start ¶
func (r *JobRunner) Start(ctx context.Context)
func (r *JobRunner) Stop ¶
func (r *JobRunner) Stop(ctx context.Context) error
type ListOptions ¶
type ListOptions struct {
Limit int
}
type QueueNamer ¶
type QueueNamer interface {
JobQueue() string
}
type Record ¶
type Record struct {
ID int64 `json:"id"`
Kind string `json:"kind"`
Queue string `json:"queue"`
Payload json.RawMessage `json:"-"`
State State `json:"state"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
RunAt time.Time `json:"run_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
LastError string `json:"last_error,omitempty"`
}
type RetryParams ¶
type RetryParams struct {
ID int64
RunAt time.Time
LastError string
}
type RetryPolicy ¶
type RetryPolicy interface {
JobMaxAttempts() int
JobRetryDelay(attempt int, err error) time.Duration
}
type Snapshot ¶
type Snapshot struct {
Running bool `json:"running"`
Definitions []Definition `json:"definitions"`
Stats Stats `json:"stats"`
Recent []Record `json:"recent"`
}
type State ¶
type State string
type Stats ¶
type Stats struct {
Total int `json:"total"`
ByState map[State]int `json:"by_state"`
ByKind map[string]int `json:"by_kind"`
ByQueue map[string]int `json:"by_queue"`
}
Directories ¶
| Path | Synopsis |
|---|---|
| lazyjobs/inmemoryjobs | Package inmemoryjobs provides an in-memory lazyjobs backend. |
Package lazyjobs defines typed background jobs, a runner, and the storage contract used to persist queued work.
A job is a pointer to a struct that implements Job. The struct is the payload: JobRunner.Enqueue JSON-encodes it, stores that JSON through a Backend, and a worker later decodes the record back into the registered concrete type before calling Work. Job.Kind is the stable name stored with each record, so it should not be changed casually once jobs have been queued.
JobRunner.Register records the job definitions the runner is allowed to decode and run. The optional QueueNamer and RetryPolicy interfaces let a job choose a queue, max attempts, and retry delay. Embedding BaseJob is the common way to use the default queue, 25 attempts, and an exponential retry delay capped at one minute while overriding only the methods a job needs.
The runner owns worker lifecycle and state transitions. Enqueued records start as StatePending. Backend.Claim moves due work on a watched queue to StateRunning and increments the attempt count. A successful Work call moves the record to StateSucceeded. A returned error or panic moves the record to StateRetrying with a future RunAt until attempts are exhausted, then the record moves to StateDiscarded with LastError. Decode failures and unknown job kinds are discarded because the runner cannot safely execute them.
Backend is deliberately small: it persists records, atomically claims due work, records completion, retry, and discard decisions, and exposes List and Stats for inspection. The inmemoryjobs backend is useful for development, tests, and apps that do not need durable jobs. Durable applications can supply another backend without changing job types or runner code.
In a GoLazy application, lazyapp.Config.Jobs is the usual integration point. lazyapp calls the JobsConfig after lazydeps has initialized the app context, fills in inmemoryjobs.New when no backend is provided, creates the JobRunner, stores it in the context with WithRunner, starts it, and registers the job control-plane endpoint. Code that receives an app request context can call RunnerFromContext and enqueue work without importing the application package.
RegisterControlPlaneHandlers exposes GET /jobs on a lazycontrolplane.ControlPlane and returns a Snapshot with definitions, aggregate stats, and recent records. RegisterLazyDevHandlers currently registers the same endpoint; lazyapp calls it for lazydev builds as part of the development control plane. Custom apps only need to call these functions when they assemble lazycontrolplane and JobRunner directly instead of using lazyapp.