collector

This commit is contained in:
Arnas Udovic 2025-06-20 19:04:57 +03:00
parent 40f68ca46b
commit caadb75187
6 changed files with 182 additions and 38 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
peertube-instance-index-filter
instances.db

View file

@ -8,7 +8,7 @@ This filter is middleware to collect removed instances and list global index wi
App can be called directly and do management in console.
- `peertube-instance-index-filter -command index -host https://instances.joinpeertube.org/api/v1/instances/hosts` - add url to index of hosts. Later it will be used to collect instances
- `peertube-instance-index-filter -command index -url https://instances.joinpeertube.org/api/v1/instances/hosts -instance-url https://instances.joinpeertube.org/api/v1/instances` - add url to index of hosts. Later it will be used to collect instances
- `peertube-instance-index-filter -command collect` - collect instances from index of hosts urls
- `peertube-instance-index-filter -command reject -host www.example.com` - reject instance from index to exclude it from global index

View file

@ -1,33 +1,143 @@
// peertube-instance-index-filter
// Copyright (C) 2025 Arns Udovič <zordsdavini@arns.lt>
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package main
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"regexp"
"time"
)
type Instance struct {
Url string
Data string
CreatedAt string
Rejected bool
RejectReason string
}
func getNewHosts(url string, lastFetched string, start int, count int) ([]string, error) {
return []string{}, nil
type HostRequest struct {
Total int `json:"total"`
Hosts []Host `json:"data"`
}
func fetchInstance(url string, host string) Instance {
return Instance{}
type InstanceRequest struct {
Total int `json:"total"`
Data []json.RawMessage `json:"data"`
}
type Host struct {
Host string `json:"host"`
}
func getNewHosts(indexUrl string, lastFetched string, start int, count int) ([]string, error) {
hosts := []string{}
reqURL, err := url.Parse(indexUrl)
if err != nil {
return nil, fmt.Errorf("error parsing URL: %w", err)
}
fmt.Println(lastFetched)
query := reqURL.Query()
query.Set("start", fmt.Sprintf("%d", start))
query.Set("count", fmt.Sprintf("%d", count))
query.Set("since", lastFetched)
query.Set("sort", "createdAt")
reqURL.RawQuery = query.Encode()
resp, err := http.Get(reqURL.String())
if err != nil {
return nil, fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Println(resp)
return nil, fmt.Errorf("HTTP error: %s", resp.Status)
}
var hostsRequest HostRequest
if err := json.NewDecoder(resp.Body).Decode(&hostsRequest); err != nil {
return nil, fmt.Errorf("error decoding JSON: %w", err)
}
for _, host := range hostsRequest.Hosts {
hosts = append(hosts, host.Host)
}
return hosts, nil
}
func fetchInstance(instanceUrl string, host string) Instance {
instance := Instance{
Url: host,
CreatedAt: time.Now().Format("2006-01-02T15:04:05Z"),
Rejected: false,
RejectReason: "",
}
if instanceUrl == "" {
return instance
}
reqURL, err := url.Parse(instanceUrl)
if err != nil {
panic(err)
}
query := reqURL.Query()
query.Set("search", host)
reqURL.RawQuery = query.Encode()
resp, err := http.Get(reqURL.String())
if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
panic("not OK")
}
var instanceRequest InstanceRequest
if err := json.NewDecoder(resp.Body).Decode(&instanceRequest); err != nil {
panic(err)
}
if len(instanceRequest.Data) == 0 {
return instance
}
instance.Data = string(instanceRequest.Data[0])
re := regexp.MustCompile(`"createdAt":"[\d\-T:\.Z]+"`)
results := re.FindAll(instanceRequest.Data[0], -1)
if len(results) > 0 {
re := regexp.MustCompile(`\d{4}[\d\-T:\.Z]+`)
result := re.Find(results[0])
instance.CreatedAt = string(result)
fmt.Println(instance.CreatedAt)
}
return instance
}

45
db.go
View file

@ -1,16 +1,16 @@
// peertube-instance-index-filter
// Copyright (C) 2025 Arns Udovič <zordsdavini@arns.lt>
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
@ -18,6 +18,8 @@ package main
import (
"database/sql"
"time"
_ "github.com/mattn/go-sqlite3"
)
@ -25,6 +27,13 @@ var migrations = []string{
"SELECT 1;",
"ALTER TABLE instances ADD COLUMN rejected INTEGER NOT NULL DEFAULT 0;",
"ALTER TABLE instances ADD COLUMN reject_reason TEXT;",
"ALTER TABLE index_host ADD COLUMN instance_url TEXT;",
}
type IndexHost struct {
Url string
InstanceUrl string
LastFetchedAt string
}
func connectDB() *sql.DB {
@ -103,35 +112,41 @@ func indexExists(db *sql.DB, url string) (bool, error) {
return count > 0, err
}
func addIndex(db *sql.DB, url string) {
db.Exec("INSERT INTO index_host (url, last_fetched_at) VALUES (?, '2000-01-01');", url)
func addIndex(db *sql.DB, indexHost IndexHost) {
db.Exec("INSERT INTO index_host (url, instance_url, last_fetched_at) VALUES (?, ?, '2000-01-01');", indexHost.Url, indexHost.InstanceUrl)
}
func rejectHost(db *sql.DB, host string) {
db.Exec("UPDATE instances SET rejected = 1 WHERE url = ?", host)
}
func getIndexUrls(db *sql.DB) map[string]string {
urls := make(map[string]string)
rows, err := db.Query("SELECT url, last_fetched_at FROM index_host")
func getIndexHosts(db *sql.DB) []IndexHost {
indexHosts := []IndexHost{}
rows, err := db.Query("SELECT url, instance_url, last_fetched_at FROM index_host")
if err != nil {
panic(err)
}
for rows.Next() {
var url string
var lastFetchedAt string
rows.Scan(&url, &lastFetchedAt)
urls[url] = lastFetchedAt
var indexHost IndexHost
rows.Scan(&indexHost.Url, &indexHost.InstanceUrl, &indexHost.LastFetchedAt)
indexHosts = append(indexHosts, indexHost)
}
return urls
return indexHosts
}
func updateLastFetched(db *sql.DB, url string) {
db.Exec("UPDATE index_host SET last_fetched_at = datetime('now') WHERE url = ?", url)
func updateLastFetched(db *sql.DB, indexHost IndexHost) {
db.Exec("UPDATE index_host SET last_fetched_at = ? WHERE url = ?", time.Now().Add(-24*time.Hour).Format("2006-01-02"), indexHost.Url)
}
func addInstance(db *sql.DB, instance Instance) {
var count int
db.QueryRow("SELECT COUNT(*) FROM instance WHERE url = ?", instance.Url).Scan(&count)
if count > 0 {
return
}
db.Exec("INSERT INTO instances (url, data, created_at) VALUES (?, ?, ?);", instance.Url, instance.Data, instance.CreatedAt)
}

Binary file not shown.

48
main.go
View file

@ -1,16 +1,16 @@
// peertube-instance-index-filter
// Copyright (C) 2025 Arns Udovič <zordsdavini@arns.lt>
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
@ -21,6 +21,7 @@ import (
"fmt"
"net/url"
"strings"
"time"
"github.com/gin-gonic/gin"
)
@ -28,16 +29,20 @@ import (
func main() {
var command string
var host string
var url string
var instanceUrl string
flag.StringVar(&command, "command", "", "Command to execute")
flag.StringVar(&host, "host", "", "Host for command")
flag.StringVar(&command, "command", "", "Command to execute: index, reject, collect, serve")
flag.StringVar(&host, "host", "", "Host to reject")
flag.StringVar(&url, "url", "", "Url to index hosts")
flag.StringVar(&instanceUrl, "instance-url", "", "Url to fetch instance information")
flag.Parse()
fmt.Println(command, host)
switch command {
case "index":
index(host)
index(url, instanceUrl)
case "reject":
reject(host)
case "collect":
@ -47,7 +52,7 @@ func main() {
}
}
func index(url string) {
func index(url string, instanceUrl string) {
db := connectDB()
defer db.Close()
@ -57,7 +62,11 @@ func index(url string) {
}
if !exists {
addIndex(db, url)
indexHost := IndexHost{
Url: url,
InstanceUrl: instanceUrl,
}
addIndex(db, indexHost)
fmt.Println(url, "added to index")
} else {
fmt.Println(url, "already added")
@ -78,14 +87,15 @@ func collect() {
db := connectDB()
defer db.Close()
urls := getIndexUrls(db)
for url, lastFetched := range urls {
fmt.Println(url)
indexHosts := getIndexHosts(db)
for _, indexHost := range indexHosts {
fmt.Println(indexHost.Url)
fmt.Println("==========================================")
start := 0
count := 20
for {
hosts, err := getNewHosts(url, lastFetched, start, count)
hosts, err := getNewHosts(indexHost.Url, indexHost.LastFetchedAt, start, count)
if err != nil {
panic(err)
}
@ -94,21 +104,29 @@ func collect() {
break
}
fmt.Println("New hosts:", len(hosts), hosts)
for _, host := range hosts {
instance := fetchInstance(url, host)
fmt.Println(host)
time.Sleep(1 * time.Second)
instance := fetchInstance(indexHost.InstanceUrl, host)
addInstance(db, instance)
}
start += count
}
updateLastFetched(db, url)
updateLastFetched(db, indexHost)
}
}
func serve() {
r := gin.Default()
r.GET("/", func(c *gin.Context) {
c.String(200, "index")
})
r.GET("/instances", func(c *gin.Context) {
c.String(200, "pong")
})
@ -121,7 +139,7 @@ func serve() {
c.String(200, "pong")
})
r.Run(":8080")
r.Run(":8081")
}
func formatHost(host string) string {