Files
monitoring-pf/internal/pf/db.go
Grendgi 1b8382a6ca
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 35s
Delay PF permit competitor removal
2026-06-05 12:36:15 +03:00

377 lines
9.7 KiB
Go

package pf
import (
"context"
"database/sql"
"fmt"
"net/url"
"path/filepath"
"strings"
"time"
_ "modernc.org/sqlite"
)
type App struct {
Cfg Config
DB *sql.DB
Worker *Worker
TG *Telegram
}
type Employee struct {
ID int64 `json:"id"`
Name string `json:"name"`
PortalUserID *string `json:"portal_user_id"`
TGChatID *string `json:"tg_chat_id"`
TGUsername *string `json:"tg_username"`
ProjectsTotal int64 `json:"projects_total"`
CreatedAt *string `json:"created_at"`
}
type Project struct {
ID int64 `json:"id"`
Title string `json:"title"`
DealType string `json:"deal_type"`
OurPrice *float64 `json:"our_price"`
Notes *string `json:"notes"`
DLDPermit *string `json:"dld_permit"`
Building *string `json:"building"`
Bedrooms *int64 `json:"bedrooms"`
SizeSqft *float64 `json:"size_sqft"`
OurURL *string `json:"our_url"`
OwnerID int64 `json:"owner_id"`
Owner *Employee `json:"owner,omitempty"`
CreatedAt *string `json:"created_at"`
LastCheckedAt *string `json:"last_checked_at"`
ListingsTotal int64 `json:"listings_total"`
ListingsActive int64 `json:"listings_active"`
ListingsRemoved int64 `json:"listings_removed"`
MinCompetitorPrice *float64 `json:"min_competitor_price"`
Listings []Listing `json:"listings,omitempty"`
}
type Listing struct {
ID int64 `json:"id"`
ProjectID int64 `json:"project_id"`
Source string `json:"source"`
ExternalID string `json:"external_id"`
URL string `json:"url"`
Title *string `json:"title"`
AgentName *string `json:"agent_name"`
AgencyName *string `json:"agency_name"`
PermitNumber *string `json:"permit_number"`
AutoDiscovered bool `json:"auto_discovered"`
CurrentPrice *float64 `json:"current_price"`
Currency *string `json:"currency"`
Status string `json:"status"`
FirstSeenAt *string `json:"first_seen_at"`
LastSeenAt *string `json:"last_seen_at"`
PriceHistory []PricePoint `json:"price_history,omitempty"`
}
type PricePoint struct {
ID int64 `json:"id"`
Price *float64 `json:"price"`
RecordedAt *string `json:"recorded_at"`
}
type TeamOverviewRow struct {
EmployeeID int64 `json:"employee_id"`
EmployeeName string `json:"employee_name"`
PortalUserID *string `json:"portal_user_id"`
TelegramLinked bool `json:"telegram_linked"`
ProjectID *int64 `json:"project_id"`
ProjectTitle *string `json:"project_title"`
DealType *string `json:"deal_type"`
DLDPermit *string `json:"dld_permit"`
LastCheckedAt *string `json:"last_checked_at"`
ListingsTotal int64 `json:"listings_total"`
ListingsActive int64 `json:"listings_active"`
ListingsRemoved int64 `json:"listings_removed"`
MinCompetitorPrice *float64 `json:"min_competitor_price"`
}
func OpenApp(ctx context.Context, cfg Config) (*App, error) {
db, err := sql.Open("sqlite", sqliteDSN(cfg.DatabaseURL))
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
if err := db.PingContext(ctx); err != nil {
_ = db.Close()
return nil, err
}
app := &App{
Cfg: cfg,
DB: db,
Worker: NewWorker(cfg),
TG: NewTelegram(cfg.TGBotToken),
}
if err := app.InitDB(ctx); err != nil {
_ = db.Close()
return nil, err
}
return app, nil
}
func sqliteDSN(databaseURL string) string {
const prefix = "sqlite:///"
if strings.HasPrefix(databaseURL, prefix) {
path := strings.TrimPrefix(databaseURL, prefix)
if !strings.HasPrefix(path, "/") {
path = filepath.Clean(path)
}
return path
}
if strings.HasPrefix(databaseURL, "sqlite://") {
u, err := url.Parse(databaseURL)
if err == nil && u.Path != "" {
return u.Path
}
}
return databaseURL
}
func (a *App) Close() error {
return a.DB.Close()
}
func (a *App) InitDB(ctx context.Context) error {
stmts := []string{
`CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY,
name VARCHAR(200) NOT NULL,
portal_user_id VARCHAR(100),
tg_chat_id VARCHAR(64),
tg_username VARCHAR(200),
created_at DATETIME
)`,
`CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_portal_user_id ON employees (portal_user_id)`,
`CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_tg_chat_id ON employees (tg_chat_id)`,
`CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY,
title VARCHAR(300) NOT NULL,
deal_type VARCHAR(4) NOT NULL,
our_price FLOAT,
notes TEXT,
dld_permit VARCHAR(100),
building VARCHAR(300),
bedrooms INTEGER,
size_sqft FLOAT,
our_url TEXT,
owner_id INTEGER NOT NULL,
created_at DATETIME,
last_checked_at DATETIME,
FOREIGN KEY(owner_id) REFERENCES employees(id)
)`,
`CREATE INDEX IF NOT EXISTS ix_projects_dld_permit ON projects (dld_permit)`,
`CREATE TABLE IF NOT EXISTS competitor_listings (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL,
source VARCHAR(14) NOT NULL,
external_id VARCHAR(100) NOT NULL,
url TEXT NOT NULL,
title VARCHAR(500),
agent_name VARCHAR(300),
agency_name VARCHAR(300),
permit_number VARCHAR(100),
auto_discovered BOOLEAN NOT NULL DEFAULT 0,
permit_missing_checks INTEGER NOT NULL DEFAULT 0,
current_price FLOAT,
currency VARCHAR(10),
status VARCHAR(7) NOT NULL,
first_seen_at DATETIME,
last_seen_at DATETIME,
FOREIGN KEY(project_id) REFERENCES projects(id)
)`,
`CREATE UNIQUE INDEX IF NOT EXISTS uq_listing ON competitor_listings (project_id, source, external_id)`,
`CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY,
listing_id INTEGER NOT NULL,
price FLOAT,
recorded_at DATETIME,
FOREIGN KEY(listing_id) REFERENCES competitor_listings(id)
)`,
}
for _, stmt := range stmts {
if _, err := a.DB.ExecContext(ctx, stmt); err != nil {
return err
}
}
if err := a.migrateEmployees(ctx); err != nil {
return err
}
return a.migrateCompetitorListings(ctx)
}
func (a *App) migrateEmployees(ctx context.Context) error {
rows, err := a.DB.QueryContext(ctx, `PRAGMA table_info(employees)`)
if err != nil {
return err
}
defer rows.Close()
columns := map[string]bool{}
for rows.Next() {
var cid int
var name, typ string
var notNull int
var defaultValue any
var pk int
if err := rows.Scan(&cid, &name, &typ, &notNull, &defaultValue, &pk); err != nil {
return err
}
columns[name] = true
}
if !columns["portal_user_id"] {
if _, err := a.DB.ExecContext(ctx, `ALTER TABLE employees ADD COLUMN portal_user_id VARCHAR(100)`); err != nil {
return err
}
}
_, err = a.DB.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS ix_employees_portal_user_id ON employees (portal_user_id)`)
return err
}
func (a *App) migrateCompetitorListings(ctx context.Context) error {
rows, err := a.DB.QueryContext(ctx, `PRAGMA table_info(competitor_listings)`)
if err != nil {
return err
}
defer rows.Close()
columns := map[string]bool{}
for rows.Next() {
var cid int
var name, typ string
var notNull int
var defaultValue any
var pk int
if err := rows.Scan(&cid, &name, &typ, &notNull, &defaultValue, &pk); err != nil {
return err
}
columns[name] = true
}
if !columns["permit_number"] {
if _, err := a.DB.ExecContext(ctx, `ALTER TABLE competitor_listings ADD COLUMN permit_number VARCHAR(100)`); err != nil {
return err
}
}
if !columns["auto_discovered"] {
if _, err := a.DB.ExecContext(ctx, `ALTER TABLE competitor_listings ADD COLUMN auto_discovered BOOLEAN NOT NULL DEFAULT 0`); err != nil {
return err
}
}
if !columns["permit_missing_checks"] {
if _, err := a.DB.ExecContext(ctx, `ALTER TABLE competitor_listings ADD COLUMN permit_missing_checks INTEGER NOT NULL DEFAULT 0`); err != nil {
return err
}
}
return nil
}
func cleanPtr(value *string) *string {
if value == nil {
return nil
}
v := strings.TrimSpace(*value)
if v == "" {
return nil
}
return &v
}
func cleanString(value string) string {
return strings.TrimSpace(value)
}
func dbNow() string {
return time.Now().UTC().Format("2006-01-02 15:04:05.000000")
}
func enumDealIn(value string) (string, error) {
switch strings.ToLower(strings.TrimSpace(value)) {
case "sale", "SALE":
return "SALE", nil
case "rent", "RENT":
return "RENT", nil
default:
return "", fmt.Errorf("invalid deal_type")
}
}
func enumDealOut(value string) string {
switch strings.ToUpper(value) {
case "RENT":
return "rent"
default:
return "sale"
}
}
func enumSourceOut(value string) string {
switch strings.ToUpper(value) {
case "BAYUT":
return "bayut"
default:
return "propertyfinder"
}
}
func enumStatusOut(value string) string {
switch strings.ToUpper(value) {
case "REMOVED":
return "removed"
default:
return "active"
}
}
func enumStatusIn(value string) string {
switch strings.ToLower(value) {
case "removed":
return "REMOVED"
default:
return "ACTIVE"
}
}
func timeOut(raw sql.NullString) *string {
if !raw.Valid || strings.TrimSpace(raw.String) == "" {
return nil
}
value := strings.TrimSpace(raw.String)
layouts := []string{
time.RFC3339Nano,
"2006-01-02 15:04:05.999999",
"2006-01-02 15:04:05",
"2006-01-02T15:04:05.999999",
}
for _, layout := range layouts {
if t, err := time.Parse(layout, value); err == nil {
out := t.UTC().Format(time.RFC3339)
return &out
}
}
return &value
}
func nullableString(ns sql.NullString) *string {
if !ns.Valid {
return nil
}
return &ns.String
}
func nullableFloat(nf sql.NullFloat64) *float64 {
if !nf.Valid {
return nil
}
return &nf.Float64
}
func nullableInt(ni sql.NullInt64) *int64 {
if !ni.Valid {
return nil
}
return &ni.Int64
}