Compare commits
3 Commits
c618ffaff9
...
773f53f790
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
773f53f790 | ||
|
|
abc64214d2 | ||
|
|
e45884c5e5 |
35
.gitea/scripts/hygiene-check.sh
Normal file
35
.gitea/scripts/hygiene-check.sh
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
fail=0
|
||||||
|
|
||||||
|
while IFS= read -r -d '' path; do
|
||||||
|
base="$(basename "$path")"
|
||||||
|
case "$base" in
|
||||||
|
.DS_Store|.env)
|
||||||
|
echo "::error file=$path::tracked local-only file is forbidden"
|
||||||
|
fail=1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
case "$path" in
|
||||||
|
*node_modules/*|node_modules/*)
|
||||||
|
echo "::error file=$path::tracked node_modules content is forbidden"
|
||||||
|
fail=1
|
||||||
|
;;
|
||||||
|
*.tmp|*.temp|*.bak|*.orig|*.rej|*.zip|*.tar|*.tar.gz|*.tgz|*.rar|*.7z)
|
||||||
|
echo "::error file=$path::tracked temporary/archive artifact is forbidden"
|
||||||
|
fail=1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
if [ -f "$path" ]; then
|
||||||
|
size="$(wc -c < "$path" | tr -d ' ')"
|
||||||
|
if [ "${size:-0}" -gt 52428800 ]; then
|
||||||
|
echo "::error file=$path::tracked file is larger than 50 MiB"
|
||||||
|
fail=1
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done < <(git ls-files -z)
|
||||||
|
|
||||||
|
exit "$fail"
|
||||||
@@ -5,8 +5,15 @@ on:
|
|||||||
pull_request:
|
pull_request:
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
hygiene:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- run: bash .gitea/scripts/hygiene-check.sh
|
||||||
|
|
||||||
test:
|
test:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
needs: hygiene
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
- uses: actions/setup-go@v5
|
- uses: actions/setup-go@v5
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) {
|
|||||||
ctx, cancel := contextWithTimeout(r, 12*time.Second)
|
ctx, cancel := contextWithTimeout(r, 12*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
stats, err := s.store.Stats(ctx)
|
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -337,7 +337,7 @@ func (s *Server) handleFailJob(w http.ResponseWriter, r *http.Request, path stri
|
|||||||
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
ctx, cancel := contextWithTimeout(r, 8*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
stats, err := s.store.Stats(ctx)
|
stats, err := s.store.Stats(ctx, s.cfg.WorkerLeaseTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -135,8 +135,12 @@ type BacklogStat struct {
|
|||||||
ModelProfile string `json:"model_profile"`
|
ModelProfile string `json:"model_profile"`
|
||||||
Pending int64 `json:"pending"`
|
Pending int64 `json:"pending"`
|
||||||
Running int64 `json:"running"`
|
Running int64 `json:"running"`
|
||||||
|
StaleRunning int64 `json:"stale_running"`
|
||||||
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
OldestPendingAgeSeconds int64 `json:"oldest_pending_age_seconds"`
|
||||||
OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"`
|
OldestPendingScheduledAt string `json:"oldest_pending_scheduled_at,omitempty"`
|
||||||
|
OldestRunningAgeSeconds int64 `json:"oldest_running_age_seconds"`
|
||||||
|
OldestRunningStartedAt string `json:"oldest_running_started_at,omitempty"`
|
||||||
|
LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type OwnerStat struct {
|
type OwnerStat struct {
|
||||||
|
|||||||
@@ -41,17 +41,48 @@ func Open(ctx context.Context, databaseURL string) (*Store, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("parse database url: %w", err)
|
return nil, fmt.Errorf("parse database url: %w", err)
|
||||||
}
|
}
|
||||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
pool, err := connectWithRetry(ctx, cfg, 2*time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("connect postgres: %w", err)
|
return nil, err
|
||||||
}
|
|
||||||
if err := pool.Ping(ctx); err != nil {
|
|
||||||
pool.Close()
|
|
||||||
return nil, fmt.Errorf("ping postgres: %w", err)
|
|
||||||
}
|
}
|
||||||
return &Store{pool: pool}, nil
|
return &Store{pool: pool}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func connectWithRetry(ctx context.Context, cfg *pgxpool.Config, maxWait time.Duration) (*pgxpool.Pool, error) {
|
||||||
|
deadline := time.Now().Add(maxWait)
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
|
for attempt := 1; ; attempt++ {
|
||||||
|
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||||
|
if err == nil {
|
||||||
|
if pingErr := pool.Ping(ctx); pingErr == nil {
|
||||||
|
return pool, nil
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("ping postgres: %w", pingErr)
|
||||||
|
pool.Close()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("connect postgres: %w", err)
|
||||||
|
}
|
||||||
|
lastErr = err
|
||||||
|
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
return nil, fmt.Errorf("connect postgres after retry: %w", lastErr)
|
||||||
|
}
|
||||||
|
sleep := time.Duration(attempt) * time.Second
|
||||||
|
if sleep > 5*time.Second {
|
||||||
|
sleep = 5 * time.Second
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(sleep)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
timer.Stop()
|
||||||
|
return nil, fmt.Errorf("connect postgres cancelled: %w", ctx.Err())
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) Close() {
|
func (s *Store) Close() {
|
||||||
s.pool.Close()
|
s.pool.Close()
|
||||||
}
|
}
|
||||||
@@ -627,7 +658,10 @@ WHERE j.id = picked.id
|
|||||||
return int(tag.RowsAffected()), nil
|
return int(tag.RowsAffected()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Stats(ctx context.Context) (*model.Stats, error) {
|
func (s *Store) Stats(ctx context.Context, staleAfter time.Duration) (*model.Stats, error) {
|
||||||
|
if staleAfter <= 0 {
|
||||||
|
staleAfter = 15 * time.Minute
|
||||||
|
}
|
||||||
out := &model.Stats{At: time.Now().UTC()}
|
out := &model.Stats{At: time.Now().UTC()}
|
||||||
|
|
||||||
queueRows, err := s.pool.Query(ctx, `
|
queueRows, err := s.pool.Query(ctx, `
|
||||||
@@ -741,13 +775,20 @@ SELECT owner_service,
|
|||||||
model_profile,
|
model_profile,
|
||||||
count(*) FILTER (WHERE status = 'pending') AS pending,
|
count(*) FILTER (WHERE status = 'pending') AS pending,
|
||||||
count(*) FILTER (WHERE status = 'running') AS running,
|
count(*) FILTER (WHERE status = 'running') AS running,
|
||||||
|
count(*) FILTER (
|
||||||
|
WHERE status = 'running'
|
||||||
|
AND COALESCE(heartbeat_at, started_at, updated_at) < NOW() - make_interval(secs => $1)
|
||||||
|
) AS stale_running,
|
||||||
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
|
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(scheduled_at) FILTER (WHERE status = 'pending')))::bigint, 0) AS oldest_pending_age_seconds,
|
||||||
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at
|
MIN(scheduled_at) FILTER (WHERE status = 'pending') AS oldest_pending_scheduled_at,
|
||||||
|
COALESCE(EXTRACT(EPOCH FROM (NOW() - MIN(started_at) FILTER (WHERE status = 'running')))::bigint, 0) AS oldest_running_age_seconds,
|
||||||
|
MIN(started_at) FILTER (WHERE status = 'running') AS oldest_running_started_at,
|
||||||
|
MAX(heartbeat_at) FILTER (WHERE status = 'running') AS last_heartbeat_at
|
||||||
FROM ai_jobs
|
FROM ai_jobs
|
||||||
WHERE status IN ('pending', 'running')
|
WHERE status IN ('pending', 'running')
|
||||||
GROUP BY owner_service, task_type, model_profile
|
GROUP BY owner_service, task_type, model_profile
|
||||||
ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile
|
ORDER BY stale_running DESC, pending DESC, running DESC, owner_service, task_type, model_profile
|
||||||
`)
|
`, int(staleAfter.Seconds()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -755,20 +796,32 @@ ORDER BY pending DESC, running DESC, owner_service, task_type, model_profile
|
|||||||
for backlogRows.Next() {
|
for backlogRows.Next() {
|
||||||
var stat model.BacklogStat
|
var stat model.BacklogStat
|
||||||
var oldestPendingScheduledAt *time.Time
|
var oldestPendingScheduledAt *time.Time
|
||||||
|
var oldestRunningStartedAt *time.Time
|
||||||
|
var lastHeartbeatAt *time.Time
|
||||||
if err := backlogRows.Scan(
|
if err := backlogRows.Scan(
|
||||||
&stat.OwnerService,
|
&stat.OwnerService,
|
||||||
&stat.TaskType,
|
&stat.TaskType,
|
||||||
&stat.ModelProfile,
|
&stat.ModelProfile,
|
||||||
&stat.Pending,
|
&stat.Pending,
|
||||||
&stat.Running,
|
&stat.Running,
|
||||||
|
&stat.StaleRunning,
|
||||||
&stat.OldestPendingAgeSeconds,
|
&stat.OldestPendingAgeSeconds,
|
||||||
&oldestPendingScheduledAt,
|
&oldestPendingScheduledAt,
|
||||||
|
&stat.OldestRunningAgeSeconds,
|
||||||
|
&oldestRunningStartedAt,
|
||||||
|
&lastHeartbeatAt,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if oldestPendingScheduledAt != nil {
|
if oldestPendingScheduledAt != nil {
|
||||||
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
|
stat.OldestPendingScheduledAt = oldestPendingScheduledAt.UTC().Format(time.RFC3339)
|
||||||
}
|
}
|
||||||
|
if oldestRunningStartedAt != nil {
|
||||||
|
stat.OldestRunningStartedAt = oldestRunningStartedAt.UTC().Format(time.RFC3339)
|
||||||
|
}
|
||||||
|
if lastHeartbeatAt != nil {
|
||||||
|
stat.LastHeartbeatAt = lastHeartbeatAt.UTC().Format(time.RFC3339)
|
||||||
|
}
|
||||||
out.Backlog = append(out.Backlog, stat)
|
out.Backlog = append(out.Backlog, stat)
|
||||||
}
|
}
|
||||||
return out, backlogRows.Err()
|
return out, backlogRows.Err()
|
||||||
|
|||||||
Reference in New Issue
Block a user