From caadb75187ee7eed056a824de86a83dd4b033d44 Mon Sep 17 00:00:00 2001 From: Arnas Udovic Date: Fri, 20 Jun 2025 19:04:57 +0300 Subject: [PATCH] collector --- .gitignore | 1 + README.md | 2 +- collector.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++++--- db.go | 45 ++++++++++++------- instances.db | Bin 28672 -> 0 bytes main.go | 48 +++++++++++++------- 6 files changed, 182 insertions(+), 38 deletions(-) delete mode 100644 instances.db diff --git a/.gitignore b/.gitignore index 609e690..2f581fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ peertube-instance-index-filter +instances.db diff --git a/README.md b/README.md index cb6631a..689d64b 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This filter is middleware to collect removed instances and list global index wi App can be called directly and do management in console. -- `peertube-instance-index-filter -command index -host https://instances.joinpeertube.org/api/v1/instances/hosts` - add url to index of hosts. Later it will be used to collect instances +- `peertube-instance-index-filter -command index -url https://instances.joinpeertube.org/api/v1/instances/hosts -instance-url https://instances.joinpeertube.org/api/v1/instances` - add url to index of hosts. Later it will be used to collect instances - `peertube-instance-index-filter -command collect` - collect instances from index of hosts urls - `peertube-instance-index-filter -command reject -host www.example.com` - reject instance from index to exclude it from global index diff --git a/collector.go b/collector.go index 78636b8..6c678b9 100644 --- a/collector.go +++ b/collector.go @@ -1,33 +1,143 @@ // peertube-instance-index-filter // Copyright (C) 2025 Arns Udovič -// +// // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// +// // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -// +// // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package main +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "regexp" + "time" +) + type Instance struct { Url string Data string CreatedAt string + Rejected bool + RejectReason string } -func getNewHosts(url string, lastFetched string, start int, count int) ([]string, error) { - return []string{}, nil +type HostRequest struct { + Total int `json:"total"` + Hosts []Host `json:"data"` } -func fetchInstance(url string, host string) Instance { - return Instance{} +type InstanceRequest struct { + Total int `json:"total"` + Data []json.RawMessage `json:"data"` +} + +type Host struct { + Host string `json:"host"` +} + +func getNewHosts(indexUrl string, lastFetched string, start int, count int) ([]string, error) { + hosts := []string{} + + reqURL, err := url.Parse(indexUrl) + if err != nil { + return nil, fmt.Errorf("error parsing URL: %w", err) + } + + fmt.Println(lastFetched) + query := reqURL.Query() + query.Set("start", fmt.Sprintf("%d", start)) + query.Set("count", fmt.Sprintf("%d", count)) + query.Set("since", lastFetched) + query.Set("sort", "createdAt") + reqURL.RawQuery = query.Encode() + + resp, err := http.Get(reqURL.String()) + if err != nil { + return nil, fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Println(resp) + return nil, fmt.Errorf("HTTP error: %s", resp.Status) + } + + var hostsRequest HostRequest + if err := json.NewDecoder(resp.Body).Decode(&hostsRequest); err != nil { + return nil, fmt.Errorf("error decoding JSON: %w", err) + } + + for _, host := range hostsRequest.Hosts { + hosts = append(hosts, host.Host) + } + + return hosts, nil +} + +func fetchInstance(instanceUrl string, host string) Instance { + instance := Instance{ + Url: host, + CreatedAt: time.Now().Format("2006-01-02T15:04:05Z"), + Rejected: false, + RejectReason: "", + } + + if instanceUrl == "" { + return instance + } + + reqURL, err := url.Parse(instanceUrl) + if err != nil { + panic(err) + } + + query := reqURL.Query() + query.Set("search", host) + reqURL.RawQuery = query.Encode() + + resp, err := http.Get(reqURL.String()) + if err != nil { + panic(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + panic("not OK") + } + + var instanceRequest InstanceRequest + if err := json.NewDecoder(resp.Body).Decode(&instanceRequest); err != nil { + panic(err) + } + + if len(instanceRequest.Data) == 0 { + return instance + } + + instance.Data = string(instanceRequest.Data[0]) + + re := regexp.MustCompile(`"createdAt":"[\d\-T:\.Z]+"`) + results := re.FindAll(instanceRequest.Data[0], -1) + if len(results) > 0 { + re := regexp.MustCompile(`\d{4}[\d\-T:\.Z]+`) + result := re.Find(results[0]) + instance.CreatedAt = string(result) + fmt.Println(instance.CreatedAt) + } + + return instance } diff --git a/db.go b/db.go index 2eb2b62..cbda5bd 100644 --- a/db.go +++ b/db.go @@ -1,16 +1,16 @@ // peertube-instance-index-filter // Copyright (C) 2025 Arns Udovič -// +// // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// +// // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -// +// // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . @@ -18,6 +18,8 @@ package main import ( "database/sql" + "time" + _ "github.com/mattn/go-sqlite3" ) @@ -25,6 +27,13 @@ var migrations = []string{ "SELECT 1;", "ALTER TABLE instances ADD COLUMN rejected INTEGER NOT NULL DEFAULT 0;", "ALTER TABLE instances ADD COLUMN reject_reason TEXT;", + "ALTER TABLE index_host ADD COLUMN instance_url TEXT;", +} + +type IndexHost struct { + Url string + InstanceUrl string + LastFetchedAt string } func connectDB() *sql.DB { @@ -103,35 +112,41 @@ func indexExists(db *sql.DB, url string) (bool, error) { return count > 0, err } -func addIndex(db *sql.DB, url string) { - db.Exec("INSERT INTO index_host (url, last_fetched_at) VALUES (?, '2000-01-01');", url) +func addIndex(db *sql.DB, indexHost IndexHost) { + db.Exec("INSERT INTO index_host (url, instance_url, last_fetched_at) VALUES (?, ?, '2000-01-01');", indexHost.Url, indexHost.InstanceUrl) } func rejectHost(db *sql.DB, host string) { db.Exec("UPDATE instances SET rejected = 1 WHERE url = ?", host) } -func getIndexUrls(db *sql.DB) map[string]string { - urls := make(map[string]string) - rows, err := db.Query("SELECT url, last_fetched_at FROM index_host") +func getIndexHosts(db *sql.DB) []IndexHost { + indexHosts := []IndexHost{} + rows, err := db.Query("SELECT url, instance_url, last_fetched_at FROM index_host") if err != nil { panic(err) } for rows.Next() { - var url string - var lastFetchedAt string - rows.Scan(&url, &lastFetchedAt) - urls[url] = lastFetchedAt + var indexHost IndexHost + rows.Scan(&indexHost.Url, &indexHost.InstanceUrl, &indexHost.LastFetchedAt) + indexHosts = append(indexHosts, indexHost) } - return urls + return indexHosts } -func updateLastFetched(db *sql.DB, url string) { - db.Exec("UPDATE index_host SET last_fetched_at = datetime('now') WHERE url = ?", url) +func updateLastFetched(db *sql.DB, indexHost IndexHost) { + db.Exec("UPDATE index_host SET last_fetched_at = ? WHERE url = ?", time.Now().Add(-24*time.Hour).Format("2006-01-02"), indexHost.Url) } func addInstance(db *sql.DB, instance Instance) { + var count int + db.QueryRow("SELECT COUNT(*) FROM instance WHERE url = ?", instance.Url).Scan(&count) + + if count > 0 { + return + } + db.Exec("INSERT INTO instances (url, data, created_at) VALUES (?, ?, ?);", instance.Url, instance.Data, instance.CreatedAt) } diff --git a/instances.db b/instances.db deleted file mode 100644 index 3c320f18f8af8142b62117b1e47bc614e5ab63f8..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28672 zcmeI)?N8G{90%~bJ!WKM7+)-8FSxyO4G$aoPTx4ClEu-%>f=I&Y{zr#pb_PHCR3U((~)KO z`H|H>GJ4L)7Th&n^4DQ98*Xz~Yi@K^xq=mSlbtEb+8PUsv#PAB9M7h+rE-~`pK)AOxv&tcR2QzZ>YG z=?PReQBODN*x3_~6~G*XxV*WjX%5{y!zf)9bZ?=nVuQ009U<00Izz00bZa0SG_< z0+$lFE$ -// +// // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// +// // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. -// +// // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . @@ -21,6 +21,7 @@ import ( "fmt" "net/url" "strings" + "time" "github.com/gin-gonic/gin" ) @@ -28,16 +29,20 @@ import ( func main() { var command string var host string + var url string + var instanceUrl string - flag.StringVar(&command, "command", "", "Command to execute") - flag.StringVar(&host, "host", "", "Host for command") + flag.StringVar(&command, "command", "", "Command to execute: index, reject, collect, serve") + flag.StringVar(&host, "host", "", "Host to reject") + flag.StringVar(&url, "url", "", "Url to index hosts") + flag.StringVar(&instanceUrl, "instance-url", "", "Url to fetch instance information") flag.Parse() fmt.Println(command, host) switch command { case "index": - index(host) + index(url, instanceUrl) case "reject": reject(host) case "collect": @@ -47,7 +52,7 @@ func main() { } } -func index(url string) { +func index(url string, instanceUrl string) { db := connectDB() defer db.Close() @@ -57,7 +62,11 @@ func index(url string) { } if !exists { - addIndex(db, url) + indexHost := IndexHost{ + Url: url, + InstanceUrl: instanceUrl, + } + addIndex(db, indexHost) fmt.Println(url, "added to index") } else { fmt.Println(url, "already added") @@ -78,14 +87,15 @@ func collect() { db := connectDB() defer db.Close() - urls := getIndexUrls(db) - for url, lastFetched := range urls { - fmt.Println(url) + indexHosts := getIndexHosts(db) + for _, indexHost := range indexHosts { + fmt.Println(indexHost.Url) + fmt.Println("==========================================") start := 0 count := 20 for { - hosts, err := getNewHosts(url, lastFetched, start, count) + hosts, err := getNewHosts(indexHost.Url, indexHost.LastFetchedAt, start, count) if err != nil { panic(err) } @@ -94,21 +104,29 @@ func collect() { break } + fmt.Println("New hosts:", len(hosts), hosts) for _, host := range hosts { - instance := fetchInstance(url, host) + fmt.Println(host) + + time.Sleep(1 * time.Second) + instance := fetchInstance(indexHost.InstanceUrl, host) addInstance(db, instance) } start += count } - updateLastFetched(db, url) + updateLastFetched(db, indexHost) } } func serve() { r := gin.Default() + r.GET("/", func(c *gin.Context) { + c.String(200, "index") + }) + r.GET("/instances", func(c *gin.Context) { c.String(200, "pong") }) @@ -121,7 +139,7 @@ func serve() { c.String(200, "pong") }) - r.Run(":8080") + r.Run(":8081") } func formatHost(host string) string {