135 lines
3.4 KiB
Go
135 lines
3.4 KiB
Go
package pf
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os/exec"
|
|
"time"
|
|
)
|
|
|
|
type Worker struct {
|
|
python string
|
|
module string
|
|
}
|
|
|
|
type workerError struct {
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type workerListing struct {
|
|
ListingID int64 `json:"listing_id"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type BulkResult struct {
|
|
Added int `json:"added"`
|
|
Skipped int `json:"skipped"`
|
|
Errors []string `json:"errors"`
|
|
}
|
|
|
|
type CheckResult struct {
|
|
Changes int `json:"changes"`
|
|
}
|
|
|
|
type Suggestion struct {
|
|
Source string `json:"source"`
|
|
ExternalID string `json:"external_id"`
|
|
URL string `json:"url"`
|
|
Title *string `json:"title"`
|
|
Price *float64 `json:"price"`
|
|
Currency *string `json:"currency"`
|
|
PermitNumber *string `json:"permit_number"`
|
|
AgentName *string `json:"agent_name"`
|
|
AgencyName *string `json:"agency_name"`
|
|
IsActive bool `json:"is_active"`
|
|
}
|
|
|
|
type SuggestionsResponse struct {
|
|
OurPermit *string `json:"our_permit"`
|
|
BayutEnabled bool `json:"bayut_enabled"`
|
|
Suggestions struct {
|
|
PropertyFinder []Suggestion `json:"propertyfinder"`
|
|
Bayut []Suggestion `json:"bayut"`
|
|
} `json:"suggestions"`
|
|
}
|
|
|
|
func NewWorker(cfg Config) *Worker {
|
|
return &Worker{python: cfg.WorkerPython, module: cfg.WorkerModule}
|
|
}
|
|
|
|
func (w *Worker) AddListing(ctx context.Context, projectID int64, url string) (int64, error) {
|
|
var out workerListing
|
|
err := w.call(ctx, "add-listing", map[string]any{"project_id": projectID, "url": url}, &out)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if out.Error != "" {
|
|
return 0, errors.New(out.Error)
|
|
}
|
|
return out.ListingID, nil
|
|
}
|
|
|
|
func (w *Worker) AddListings(ctx context.Context, projectID int64, urls []string) (*BulkResult, error) {
|
|
var out BulkResult
|
|
if err := w.call(ctx, "add-listings", map[string]any{"project_id": projectID, "urls": urls}, &out); err != nil {
|
|
return nil, err
|
|
}
|
|
return &out, nil
|
|
}
|
|
|
|
func (w *Worker) CheckProject(ctx context.Context, projectID int64) (int, error) {
|
|
var out CheckResult
|
|
if err := w.call(ctx, "check-project", map[string]any{"project_id": projectID}, &out); err != nil {
|
|
return 0, err
|
|
}
|
|
return out.Changes, nil
|
|
}
|
|
|
|
func (w *Worker) CheckAll(ctx context.Context) (map[string]int, error) {
|
|
var out map[string]int
|
|
if err := w.call(ctx, "check-all", map[string]any{}, &out); err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (w *Worker) Suggest(ctx context.Context, projectID int64) (*SuggestionsResponse, error) {
|
|
var out SuggestionsResponse
|
|
if err := w.call(ctx, "suggest", map[string]any{"project_id": projectID}, &out); err != nil {
|
|
return nil, err
|
|
}
|
|
return &out, nil
|
|
}
|
|
|
|
func (w *Worker) call(ctx context.Context, command string, payload any, out any) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 15*time.Minute)
|
|
defer cancel()
|
|
|
|
body, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cmd := exec.CommandContext(ctx, w.python, "-m", w.module, command)
|
|
cmd.Stdin = bytes.NewReader(body)
|
|
var stdout, stderr bytes.Buffer
|
|
cmd.Stdout = &stdout
|
|
cmd.Stderr = &stderr
|
|
if err := cmd.Run(); err != nil {
|
|
var apiErr workerError
|
|
if json.Unmarshal(stdout.Bytes(), &apiErr) == nil && apiErr.Error != "" {
|
|
return errors.New(apiErr.Error)
|
|
}
|
|
if stderr.Len() > 0 {
|
|
return errors.New(stderr.String())
|
|
}
|
|
return err
|
|
}
|
|
if err := json.Unmarshal(stdout.Bytes(), out); err != nil {
|
|
return fmt.Errorf("worker json decode failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|