Initial AI service skeleton
This commit is contained in:
229
internal/store/store.go
Normal file
229
internal/store/store.go
Normal file
@@ -0,0 +1,229 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ai-service/internal/model"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func Open(ctx context.Context, databaseURL string) (*Store, error) {
|
||||
if strings.TrimSpace(databaseURL) == "" {
|
||||
return nil, errors.New("DATABASE_URL is required")
|
||||
}
|
||||
cfg, err := pgxpool.ParseConfig(databaseURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse database url: %w", err)
|
||||
}
|
||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect postgres: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("ping postgres: %w", err)
|
||||
}
|
||||
return &Store{pool: pool}, nil
|
||||
}
|
||||
|
||||
func (s *Store) Close() {
|
||||
s.pool.Close()
|
||||
}
|
||||
|
||||
func (s *Store) Ping(ctx context.Context) error {
|
||||
return s.pool.Ping(ctx)
|
||||
}
|
||||
|
||||
func (s *Store) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := s.pool.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) CreateJob(ctx context.Context, in model.CreateJob) (*model.Job, error) {
|
||||
if err := validateCreateJob(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if in.MaxAttempts <= 0 {
|
||||
in.MaxAttempts = 3
|
||||
}
|
||||
if len(in.Input) == 0 {
|
||||
in.Input = json.RawMessage(`{}`)
|
||||
}
|
||||
scheduledAt := time.Now().UTC()
|
||||
if in.ScheduledAt != nil {
|
||||
scheduledAt = in.ScheduledAt.UTC()
|
||||
}
|
||||
|
||||
const q = `
|
||||
INSERT INTO ai_jobs (
|
||||
owner_service, owner_ref, task_type, model_profile, priority, max_attempts,
|
||||
input, scheduled_at, idempotency_key
|
||||
)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
|
||||
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL
|
||||
DO UPDATE SET updated_at = ai_jobs.updated_at
|
||||
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||
attempts, max_attempts, input, result, error_code, error_message,
|
||||
scheduled_at, started_at, completed_at, created_at, updated_at, idempotency_key
|
||||
`
|
||||
row := s.pool.QueryRow(ctx, q,
|
||||
in.OwnerService,
|
||||
in.OwnerRef,
|
||||
in.TaskType,
|
||||
in.ModelProfile,
|
||||
in.Priority,
|
||||
in.MaxAttempts,
|
||||
in.Input,
|
||||
scheduledAt,
|
||||
in.IdempotencyKey,
|
||||
)
|
||||
return scanJob(row)
|
||||
}
|
||||
|
||||
func validateCreateJob(in model.CreateJob) error {
|
||||
switch {
|
||||
case strings.TrimSpace(in.OwnerService) == "":
|
||||
return errors.New("owner_service is required")
|
||||
case strings.TrimSpace(in.OwnerRef) == "":
|
||||
return errors.New("owner_ref is required")
|
||||
case strings.TrimSpace(in.TaskType) == "":
|
||||
return errors.New("task_type is required")
|
||||
case strings.TrimSpace(in.ModelProfile) == "":
|
||||
return errors.New("model_profile is required")
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
||||
const q = `
|
||||
SELECT id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||
attempts, max_attempts, input, result, error_code, error_message,
|
||||
scheduled_at, started_at, completed_at, created_at, updated_at, idempotency_key
|
||||
FROM ai_jobs
|
||||
WHERE id = $1
|
||||
`
|
||||
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
return job, err
|
||||
}
|
||||
|
||||
func (s *Store) RetryJob(ctx context.Context, id uuid.UUID) (*model.Job, error) {
|
||||
const q = `
|
||||
UPDATE ai_jobs
|
||||
SET status = 'pending',
|
||||
attempts = 0,
|
||||
started_at = NULL,
|
||||
completed_at = NULL,
|
||||
error_code = NULL,
|
||||
error_message = NULL,
|
||||
scheduled_at = NOW(),
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
AND status IN ('failed', 'running')
|
||||
RETURNING id, owner_service, owner_ref, task_type, model_profile, priority, status,
|
||||
attempts, max_attempts, input, result, error_code, error_message,
|
||||
scheduled_at, started_at, completed_at, created_at, updated_at, idempotency_key
|
||||
`
|
||||
job, err := scanJob(s.pool.QueryRow(ctx, q, id))
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
return job, err
|
||||
}
|
||||
|
||||
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
|
||||
out := &model.Stats{At: time.Now().UTC()}
|
||||
|
||||
queueRows, err := s.pool.Query(ctx, `
|
||||
SELECT task_type, model_profile, status, count(*)
|
||||
FROM ai_jobs
|
||||
GROUP BY task_type, model_profile, status
|
||||
ORDER BY task_type, model_profile, status
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer queueRows.Close()
|
||||
for queueRows.Next() {
|
||||
var stat model.QueueStat
|
||||
if err := queueRows.Scan(&stat.TaskType, &stat.ModelProfile, &stat.Status, &stat.Total); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out.Queues = append(out.Queues, stat)
|
||||
}
|
||||
if err := queueRows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
errorRows, err := s.pool.Query(ctx, `
|
||||
SELECT task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown') AS error_code,
|
||||
count(*) AS total,
|
||||
count(*) FILTER (WHERE updated_at > NOW() - INTERVAL '24 hours') AS last_24h
|
||||
FROM ai_jobs
|
||||
WHERE status = 'failed'
|
||||
GROUP BY task_type, model_profile, COALESCE(NULLIF(error_code, ''), 'unknown')
|
||||
ORDER BY last_24h DESC, total DESC
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer errorRows.Close()
|
||||
for errorRows.Next() {
|
||||
var stat model.ErrorStat
|
||||
if err := errorRows.Scan(&stat.TaskType, &stat.ModelProfile, &stat.ErrorCode, &stat.Total, &stat.Last24h); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out.Errors = append(out.Errors, stat)
|
||||
}
|
||||
return out, errorRows.Err()
|
||||
}
|
||||
|
||||
func scanJob(row pgx.Row) (*model.Job, error) {
|
||||
var job model.Job
|
||||
var input []byte
|
||||
var result []byte
|
||||
err := row.Scan(
|
||||
&job.ID,
|
||||
&job.OwnerService,
|
||||
&job.OwnerRef,
|
||||
&job.TaskType,
|
||||
&job.ModelProfile,
|
||||
&job.Priority,
|
||||
&job.Status,
|
||||
&job.Attempts,
|
||||
&job.MaxAttempts,
|
||||
&input,
|
||||
&result,
|
||||
&job.ErrorCode,
|
||||
&job.ErrorMessage,
|
||||
&job.ScheduledAt,
|
||||
&job.StartedAt,
|
||||
&job.CompletedAt,
|
||||
&job.CreatedAt,
|
||||
&job.UpdatedAt,
|
||||
&job.IdempotencyKey,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
job.Input = json.RawMessage(input)
|
||||
if len(result) > 0 {
|
||||
job.Result = json.RawMessage(result)
|
||||
}
|
||||
return &job, nil
|
||||
}
|
||||
Reference in New Issue
Block a user