Files
monitoring-pf/internal/pf/store.go
Grendgi d53ecb2add
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 37s
CI / go (push) Successful in 43s
CI / python (push) Successful in 2s
Fix monitoring PF CI lint issues
2026-06-09 11:33:33 +03:00

680 lines
20 KiB
Go

package pf
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
)
var ErrNotFound = errors.New("not found")
var ErrTelegramRequired = errors.New("telegram required")
func (a *App) CurrentEmployee(ctx context.Context, portalUserID string, required bool) (*Employee, error) {
if portalUserID == "" {
if required {
return nil, ErrTelegramRequired
}
return nil, nil
}
emp, err := a.EmployeeByPortalUserID(ctx, portalUserID)
if errors.Is(err, sql.ErrNoRows) {
if required {
return nil, ErrTelegramRequired
}
return nil, nil
}
if err != nil {
return nil, err
}
if emp.TGChatID == nil || *emp.TGChatID == "" {
if required {
return nil, ErrTelegramRequired
}
}
return emp, nil
}
func (a *App) EmployeeByPortalUserID(ctx context.Context, portalUserID string) (*Employee, error) {
row := a.DB.QueryRowContext(ctx, employeeSelect()+` WHERE e.portal_user_id = ?`, portalUserID)
return scanEmployee(row)
}
func (a *App) EmployeeByChatID(ctx context.Context, chatID string) (*Employee, error) {
row := a.DB.QueryRowContext(ctx, employeeSelect()+` WHERE e.tg_chat_id = ?`, chatID)
return scanEmployee(row)
}
func (a *App) ListEmployees(ctx context.Context, isAdmin bool, current *Employee) ([]Employee, error) {
if !isAdmin {
if current == nil {
return []Employee{}, nil
}
return []Employee{*current}, nil
}
rows, err := a.DB.QueryContext(ctx, employeeSelect()+` ORDER BY e.name`)
if err != nil {
return nil, err
}
defer closeRows(rows)
return scanEmployees(rows)
}
type EmployeePayload struct {
Name string `json:"name"`
PortalUserID *string `json:"portal_user_id"`
TGUsername *string `json:"tg_username"`
TGChatID *string `json:"tg_chat_id"`
}
func (a *App) CreateEmployee(ctx context.Context, p EmployeePayload) (*Employee, error) {
name := cleanString(p.Name)
if name == "" {
return nil, fmt.Errorf("name is required")
}
username := cleanPtr(p.TGUsername)
if username != nil && len(*username) > 0 && (*username)[0] == '@' {
u := (*username)[1:]
username = &u
}
res, err := a.DB.ExecContext(ctx, `
INSERT INTO employees (name, portal_user_id, tg_chat_id, tg_username, created_at)
VALUES (?, ?, ?, ?, ?)`,
name, cleanPtr(p.PortalUserID), cleanPtr(p.TGChatID), username, dbNow(),
)
if err != nil {
return nil, err
}
id, _ := res.LastInsertId()
return a.EmployeeByID(ctx, id)
}
func (a *App) EmployeeByID(ctx context.Context, id int64) (*Employee, error) {
row := a.DB.QueryRowContext(ctx, employeeSelect()+` WHERE e.id = ?`, id)
return scanEmployee(row)
}
func (a *App) UpdateEmployee(ctx context.Context, id int64, p EmployeePayload) (*Employee, error) {
emp, err := a.EmployeeByID(ctx, id)
if err != nil {
return nil, ErrNotFound
}
name := cleanString(p.Name)
if name == "" {
name = emp.Name
}
username := cleanPtr(p.TGUsername)
if username != nil && len(*username) > 0 && (*username)[0] == '@' {
u := (*username)[1:]
username = &u
}
if _, err := a.DB.ExecContext(ctx, `
UPDATE employees
SET name = ?, portal_user_id = COALESCE(?, portal_user_id), tg_username = ?, tg_chat_id = ?
WHERE id = ?`,
name, cleanPtr(p.PortalUserID), username, cleanPtr(p.TGChatID), id,
); err != nil {
return nil, err
}
return a.EmployeeByID(ctx, id)
}
func (a *App) DeleteEmployee(ctx context.Context, id int64) error {
var count int64
if err := a.DB.QueryRowContext(ctx, `SELECT count(*) FROM projects WHERE owner_id = ?`, id).Scan(&count); err != nil {
return err
}
if count > 0 {
return fmt.Errorf("employee has projects")
}
res, err := a.DB.ExecContext(ctx, `DELETE FROM employees WHERE id = ?`, id)
if err != nil {
return err
}
affected, _ := res.RowsAffected()
if affected == 0 {
return ErrNotFound
}
return nil
}
func (a *App) LinkTelegram(ctx context.Context, portalUserID, chatID, username, name string) (*Employee, error) {
if existing, err := a.EmployeeByChatID(ctx, chatID); err == nil {
if existing.PortalUserID != nil && *existing.PortalUserID != "" && *existing.PortalUserID != portalUserID {
return nil, fmt.Errorf("telegram belongs to another portal user")
}
if existing.PortalUserID == nil || *existing.PortalUserID == "" {
if _, err := a.DB.ExecContext(ctx, `
UPDATE employees SET portal_user_id = ?, tg_username = ? WHERE id = ?`,
portalUserID, nullIfEmpty(username), existing.ID,
); err != nil {
return nil, err
}
}
return a.EmployeeByID(ctx, existing.ID)
}
if emp, err := a.EmployeeByPortalUserID(ctx, portalUserID); err == nil {
if emp.TGChatID != nil && *emp.TGChatID != "" && *emp.TGChatID != chatID {
return nil, fmt.Errorf("portal user belongs to another telegram")
}
_, err := a.DB.ExecContext(ctx, `
UPDATE employees SET tg_chat_id = ?, tg_username = ?, name = COALESCE(NULLIF(name, ''), ?) WHERE id = ?`,
chatID, nullIfEmpty(username), name, emp.ID,
)
if err != nil {
return nil, err
}
return a.EmployeeByID(ctx, emp.ID)
}
res, err := a.DB.ExecContext(ctx, `
INSERT INTO employees (name, portal_user_id, tg_chat_id, tg_username, created_at)
VALUES (?, ?, ?, ?, ?)`,
name, portalUserID, chatID, nullIfEmpty(username), dbNow(),
)
if err != nil {
return nil, err
}
id, _ := res.LastInsertId()
return a.EmployeeByID(ctx, id)
}
func employeeSelect() string {
return `
SELECT e.id, e.name, e.portal_user_id, e.tg_chat_id, e.tg_username, e.created_at,
(SELECT count(*) FROM projects p WHERE p.owner_id = e.id) AS projects_total
FROM employees e`
}
type rowScanner interface {
Scan(dest ...any) error
}
func scanEmployee(row rowScanner) (*Employee, error) {
var emp Employee
var portal, chat, username, created sql.NullString
if err := row.Scan(&emp.ID, &emp.Name, &portal, &chat, &username, &created, &emp.ProjectsTotal); err != nil {
return nil, err
}
emp.PortalUserID = nullableString(portal)
emp.TGChatID = nullableString(chat)
emp.TGUsername = nullableString(username)
emp.CreatedAt = timeOut(created)
return &emp, nil
}
func scanEmployees(rows *sql.Rows) ([]Employee, error) {
items := []Employee{}
for rows.Next() {
item, err := scanEmployee(rows)
if err != nil {
return nil, err
}
items = append(items, *item)
}
return items, rows.Err()
}
type ProjectPayload struct {
Title string `json:"title"`
DealType string `json:"deal_type"`
OurPrice *float64 `json:"our_price"`
Notes *string `json:"notes"`
DLDPermit *string `json:"dld_permit"`
Building *string `json:"building"`
Bedrooms *int64 `json:"bedrooms"`
SizeSqft *float64 `json:"size_sqft"`
OurURL *string `json:"our_url"`
}
func (a *App) Summary(ctx context.Context, emp *Employee) (map[string]any, error) {
out := map[string]any{
"projects_total": 0,
"employees_total": 0,
"listings_total": 0,
"listings_active": 0,
"listings_removed": 0,
"scrape_interval_hours": a.Cfg.ScrapeIntervalHours,
"bayut_enabled": false,
}
if emp == nil {
return out, nil
}
var projects int64
var listings, active, removed sql.NullInt64
err := a.DB.QueryRowContext(ctx, `SELECT count(*) FROM projects WHERE owner_id = ?`, emp.ID).Scan(&projects)
if err != nil {
return nil, err
}
err = a.DB.QueryRowContext(ctx, `
SELECT count(*),
sum(CASE WHEN status IN ('ACTIVE','active') THEN 1 ELSE 0 END),
sum(CASE WHEN status IN ('REMOVED','removed') THEN 1 ELSE 0 END)
FROM competitor_listings l JOIN projects p ON p.id = l.project_id
WHERE p.owner_id = ?`, emp.ID).Scan(&listings, &active, &removed)
if err != nil {
return nil, err
}
out["projects_total"] = projects
out["employees_total"] = 1
out["listings_total"] = nullIntValue(listings)
out["listings_active"] = nullIntValue(active)
out["listings_removed"] = nullIntValue(removed)
return out, nil
}
func (a *App) ListProjects(ctx context.Context, ownerID int64) ([]Project, error) {
rows, err := a.DB.QueryContext(ctx, projectSelect()+` WHERE p.owner_id = ? ORDER BY p.created_at DESC`, ownerID)
if err != nil {
return nil, err
}
defer closeRows(rows)
items := []Project{}
for rows.Next() {
p, err := a.scanProject(ctx, rows, false)
if err != nil {
return nil, err
}
items = append(items, *p)
}
return items, rows.Err()
}
func (a *App) ProjectByID(ctx context.Context, ownerID, projectID int64, detail bool) (*Project, error) {
row := a.DB.QueryRowContext(ctx, projectSelect()+` WHERE p.id = ? AND p.owner_id = ?`, projectID, ownerID)
p, err := a.scanProject(ctx, row, detail)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, err
}
return p, nil
}
func (a *App) CreateProject(ctx context.Context, ownerID int64, p ProjectPayload) (*Project, error) {
title := cleanString(p.Title)
if title == "" {
return nil, fmt.Errorf("title is required")
}
deal, err := enumDealIn(p.DealType)
if err != nil {
return nil, err
}
res, err := a.DB.ExecContext(ctx, `
INSERT INTO projects
(title, deal_type, owner_id, our_price, notes, dld_permit, building, bedrooms, size_sqft, our_url, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
title, deal, ownerID, p.OurPrice, cleanPtr(p.Notes), cleanPtr(p.DLDPermit), cleanPtr(p.Building),
p.Bedrooms, p.SizeSqft, cleanPtr(p.OurURL), dbNow(),
)
if err != nil {
return nil, err
}
id, _ := res.LastInsertId()
return a.ProjectByID(ctx, ownerID, id, true)
}
func (a *App) UpdateProject(ctx context.Context, ownerID, projectID int64, p ProjectPayload) (*Project, error) {
current, err := a.ProjectByID(ctx, ownerID, projectID, false)
if err != nil {
return nil, err
}
title := cleanString(p.Title)
if title == "" {
title = current.Title
}
deal := "SALE"
if current.DealType == "rent" {
deal = "RENT"
}
if p.DealType != "" {
deal, err = enumDealIn(p.DealType)
if err != nil {
return nil, err
}
}
_, err = a.DB.ExecContext(ctx, `
UPDATE projects
SET title = ?, deal_type = ?, our_price = ?, notes = ?, dld_permit = ?,
building = ?, bedrooms = ?, size_sqft = ?, our_url = ?
WHERE id = ? AND owner_id = ?`,
title, deal, p.OurPrice, cleanPtr(p.Notes), cleanPtr(p.DLDPermit), cleanPtr(p.Building),
p.Bedrooms, p.SizeSqft, cleanPtr(p.OurURL), projectID, ownerID,
)
if err != nil {
return nil, err
}
return a.ProjectByID(ctx, ownerID, projectID, true)
}
func (a *App) DeleteProject(ctx context.Context, ownerID, projectID int64) error {
tx, err := a.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer rollbackTx(tx)
listingRows, err := tx.QueryContext(ctx, `SELECT id FROM competitor_listings WHERE project_id = ?`, projectID)
if err != nil {
return err
}
listingIDs := []int64{}
for listingRows.Next() {
var id int64
if err := listingRows.Scan(&id); err != nil {
closeRows(listingRows)
return err
}
listingIDs = append(listingIDs, id)
}
closeRows(listingRows)
for _, id := range listingIDs {
if _, err := tx.ExecContext(ctx, `DELETE FROM price_history WHERE listing_id = ?`, id); err != nil {
return err
}
}
if _, err := tx.ExecContext(ctx, `DELETE FROM competitor_listings WHERE project_id = ?`, projectID); err != nil {
return err
}
res, err := tx.ExecContext(ctx, `DELETE FROM projects WHERE id = ? AND owner_id = ?`, projectID, ownerID)
if err != nil {
return err
}
affected, _ := res.RowsAffected()
if affected == 0 {
return ErrNotFound
}
return tx.Commit()
}
func (a *App) DeleteListing(ctx context.Context, ownerID, listingID int64) error {
tx, err := a.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer rollbackTx(tx)
if _, err := tx.ExecContext(ctx, `
DELETE FROM price_history
WHERE listing_id IN (
SELECT l.id
FROM competitor_listings l
JOIN projects p ON p.id = l.project_id
WHERE l.id = ? AND p.owner_id = ?
)`, listingID, ownerID); err != nil {
return err
}
res, err := tx.ExecContext(ctx, `
DELETE FROM competitor_listings
WHERE id = ?
AND project_id IN (SELECT id FROM projects WHERE owner_id = ?)`, listingID, ownerID)
if err != nil {
return err
}
affected, _ := res.RowsAffected()
if affected == 0 {
return ErrNotFound
}
return tx.Commit()
}
func (a *App) ListingByID(ctx context.Context, id int64, withHistory bool) (*Listing, error) {
row := a.DB.QueryRowContext(ctx, listingSelect()+` WHERE l.id = ?`, id)
item, err := scanListing(row, withHistory)
if err != nil {
return nil, err
}
if withHistory {
history, err := a.PriceHistory(ctx, id)
if err != nil {
return nil, err
}
item.PriceHistory = history
}
return item, nil
}
func projectSelect() string {
return `
SELECT p.id, p.title, p.deal_type, p.our_price, p.notes, p.dld_permit, p.building,
p.bedrooms, p.size_sqft, p.our_url, p.owner_id, p.created_at, p.last_checked_at,
(SELECT count(*) FROM competitor_listings l WHERE l.project_id = p.id),
(SELECT count(*) FROM competitor_listings l WHERE l.project_id = p.id AND l.status IN ('ACTIVE','active')),
(SELECT count(*) FROM competitor_listings l WHERE l.project_id = p.id AND l.status IN ('REMOVED','removed')),
(SELECT min(l.current_price) FROM competitor_listings l WHERE l.project_id = p.id AND l.status IN ('ACTIVE','active') AND l.current_price IS NOT NULL),
e.id, e.name, e.portal_user_id, e.tg_chat_id, e.tg_username, e.created_at,
(SELECT count(*) FROM projects owner_p WHERE owner_p.owner_id = e.id) AS owner_projects_total
FROM projects p
LEFT JOIN employees e ON e.id = p.owner_id`
}
func (a *App) scanProject(ctx context.Context, row rowScanner, detail bool) (*Project, error) {
var p Project
var deal string
var price, size, minPrice sql.NullFloat64
var notes, permit, building, ourURL, created, checked sql.NullString
var bedrooms sql.NullInt64
var ownerID, ownerProjectsTotal sql.NullInt64
var ownerName, ownerPortalID, ownerChatID, ownerUsername, ownerCreated sql.NullString
if err := row.Scan(
&p.ID, &p.Title, &deal, &price, &notes, &permit, &building, &bedrooms, &size, &ourURL,
&p.OwnerID, &created, &checked, &p.ListingsTotal, &p.ListingsActive, &p.ListingsRemoved, &minPrice,
&ownerID, &ownerName, &ownerPortalID, &ownerChatID, &ownerUsername, &ownerCreated, &ownerProjectsTotal,
); err != nil {
return nil, err
}
p.DealType = enumDealOut(deal)
p.OurPrice = nullableFloat(price)
p.Notes = nullableString(notes)
p.DLDPermit = nullableString(permit)
p.Building = nullableString(building)
p.Bedrooms = nullableInt(bedrooms)
p.SizeSqft = nullableFloat(size)
p.OurURL = nullableString(ourURL)
p.CreatedAt = timeOut(created)
p.LastCheckedAt = timeOut(checked)
p.MinCompetitorPrice = nullableFloat(minPrice)
if ownerID.Valid {
p.Owner = &Employee{
ID: ownerID.Int64,
Name: ownerName.String,
PortalUserID: nullableString(ownerPortalID),
TGChatID: nullableString(ownerChatID),
TGUsername: nullableString(ownerUsername),
ProjectsTotal: nullIntValue(ownerProjectsTotal),
CreatedAt: timeOut(ownerCreated),
}
}
if detail {
listings, err := a.ListingsForProject(ctx, p.ID, true)
if err != nil {
return nil, err
}
p.Listings = listings
}
return &p, nil
}
func (a *App) ListingsForProject(ctx context.Context, projectID int64, withHistory bool) ([]Listing, error) {
rows, err := a.DB.QueryContext(ctx, listingSelect()+` WHERE l.project_id = ? ORDER BY l.first_seen_at DESC`, projectID)
if err != nil {
return nil, err
}
items := []Listing{}
for rows.Next() {
item, err := scanListing(rows, false)
if err != nil {
return nil, err
}
items = append(items, *item)
}
if err := rows.Err(); err != nil {
closeRows(rows)
return nil, err
}
closeRows(rows)
if withHistory {
for i := range items {
history, err := a.PriceHistory(ctx, items[i].ID)
if err != nil {
return nil, err
}
items[i].PriceHistory = history
}
}
return items, nil
}
func (a *App) TeamOverview(ctx context.Context, portalUserIDs []string, all bool) ([]TeamOverviewRow, error) {
args := []any{}
where := ""
if !all {
if len(portalUserIDs) == 0 {
return []TeamOverviewRow{}, nil
}
placeholders := make([]string, 0, len(portalUserIDs))
for _, id := range portalUserIDs {
placeholders = append(placeholders, "?")
args = append(args, id)
}
where = "WHERE e.portal_user_id IN (" + strings.Join(placeholders, ",") + ")"
}
rows, err := a.DB.QueryContext(ctx, `
SELECT e.id,
e.name,
e.portal_user_id,
e.tg_chat_id,
p.id,
p.title,
p.deal_type,
p.dld_permit,
p.last_checked_at,
count(l.id),
sum(CASE WHEN l.status IN ('ACTIVE','active') THEN 1 ELSE 0 END),
sum(CASE WHEN l.status IN ('REMOVED','removed') THEN 1 ELSE 0 END),
min(CASE WHEN l.status IN ('ACTIVE','active') THEN l.current_price ELSE NULL END)
FROM employees e
LEFT JOIN projects p ON p.owner_id = e.id
LEFT JOIN competitor_listings l ON l.project_id = p.id
`+where+`
GROUP BY e.id, e.name, e.portal_user_id, e.tg_chat_id,
p.id, p.title, p.deal_type, p.dld_permit, p.last_checked_at, p.created_at
ORDER BY e.name COLLATE NOCASE, p.created_at DESC`, args...)
if err != nil {
return nil, err
}
defer closeRows(rows)
items := []TeamOverviewRow{}
for rows.Next() {
var item TeamOverviewRow
var portalID, chatID, title, deal, permit, checked sql.NullString
var projectID, total, active, removed sql.NullInt64
var minPrice sql.NullFloat64
if err := rows.Scan(
&item.EmployeeID,
&item.EmployeeName,
&portalID,
&chatID,
&projectID,
&title,
&deal,
&permit,
&checked,
&total,
&active,
&removed,
&minPrice,
); err != nil {
return nil, err
}
item.PortalUserID = nullableString(portalID)
item.TelegramLinked = chatID.Valid && chatID.String != ""
item.ProjectID = nullableInt(projectID)
item.ProjectTitle = nullableString(title)
if deal.Valid {
value := enumDealOut(deal.String)
item.DealType = &value
}
item.DLDPermit = nullableString(permit)
item.LastCheckedAt = timeOut(checked)
item.ListingsTotal = nullIntValue(total)
item.ListingsActive = nullIntValue(active)
item.ListingsRemoved = nullIntValue(removed)
item.MinCompetitorPrice = nullableFloat(minPrice)
items = append(items, item)
}
return items, rows.Err()
}
func (a *App) PriceHistory(ctx context.Context, listingID int64) ([]PricePoint, error) {
rows, err := a.DB.QueryContext(ctx, `
SELECT id, price, recorded_at
FROM price_history
WHERE listing_id = ?
ORDER BY recorded_at DESC`, listingID)
if err != nil {
return nil, err
}
defer closeRows(rows)
out := []PricePoint{}
for rows.Next() {
var p PricePoint
var price sql.NullFloat64
var recorded sql.NullString
if err := rows.Scan(&p.ID, &price, &recorded); err != nil {
return nil, err
}
p.Price = nullableFloat(price)
p.RecordedAt = timeOut(recorded)
out = append(out, p)
}
return out, rows.Err()
}
func listingSelect() string {
return `
SELECT l.id, l.project_id, l.source, l.external_id, l.url, l.title, l.agent_name,
l.agency_name, l.permit_number, l.auto_discovered, l.current_price, l.currency, l.status, l.first_seen_at, l.last_seen_at
FROM competitor_listings l`
}
func scanListing(row rowScanner, _ bool) (*Listing, error) {
var l Listing
var source, status string
var title, agent, agency, permit, currency, firstSeen, lastSeen sql.NullString
var price sql.NullFloat64
var autoDiscovered bool
if err := row.Scan(
&l.ID, &l.ProjectID, &source, &l.ExternalID, &l.URL, &title, &agent, &agency,
&permit, &autoDiscovered, &price, &currency, &status, &firstSeen, &lastSeen,
); err != nil {
return nil, err
}
l.Source = enumSourceOut(source)
l.Title = nullableString(title)
l.AgentName = nullableString(agent)
l.AgencyName = nullableString(agency)
l.PermitNumber = nullableString(permit)
l.AutoDiscovered = autoDiscovered
l.CurrentPrice = nullableFloat(price)
l.Currency = nullableString(currency)
l.Status = enumStatusOut(status)
l.FirstSeenAt = timeOut(firstSeen)
l.LastSeenAt = timeOut(lastSeen)
return &l, nil
}
func nullIfEmpty(value string) *string {
value = cleanString(value)
if value == "" {
return nil
}
return &value
}
func nullIntValue(value sql.NullInt64) int64 {
if !value.Valid {
return 0
}
return value.Int64
}