Einkauf_Suche/einkauf_suche.py
RochoElLocho f083a15c14 Threads wurden angepasst.
Der Webdriver wird anders initialisiert so dass mehrere diriver
instanzen geöffnet werden.
Code wurde angepasst um doppelten Codes in funktionen zu packen und
sie dann nur aufrufen zu müssen.
2023-11-24 09:51:02 +01:00

601 lines
23 KiB
Python

from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QMessageBox
from concurrent.futures import ThreadPoolExecutor
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import sys
import concurrent.futures
class SearchThread(QThread):
search_finished = pyqtSignal()
def __init__(self, selected_webpages, search_term, parent=None):
super(SearchThread, self).__init__(parent)
self.selected_webpages = selected_webpages
self.search_term = search_term
self.error_message = None
self.drivers = {}
def run(self):
with ThreadPoolExecutor() as executor:
futures = {executor.submit(self.start_search_on_webseite, webseite): webseite for webseite in self.selected_webpages}
for future in concurrent.futures.as_completed(futures):
try:
future.result() # Warte auf das Ende der Suche
except Exception as e:
self.error_message = f"Fehler bei der Suche: {e}"
def initialize_driver(self, webseite):
self.drivers[webseite] = webdriver.Edge()
def start_search_on_webseite(self, webseite):
try:
self.initialize_driver(webseite)
if webseite == "Reichelt":
self.suche_auf_reichelt(webseite)
elif webseite == "Conrad":
self.suche_auf_conrad(webseite)
elif webseite == "Kosatec":
self.suche_auf_kosatec(webseite)
elif webseite == "Hornbach":
self.suche_auf_hornbach(webseite)
elif webseite =="Contorion":
self.suche_auf_contorion(webseite)
elif webseite == "Gastro Teile Shop":
self.suche_auf_gastroteileshop(webseite)
elif webseite == "TiroLED":
self.suche_auf_tiroled(webseite)
elif webseite == "Megabad":
self.suche_auf_megabad(webseite)
elif webseite == "IPS":
self.suche_auf_ips_shop(webseite)
elif webseite == "Brewes":
self.suche_auf_brewes(webseite)
elif webseite == "Delker":
self.suche_auf_delker(webseite)
elif webseite == "Knauss":
self.suche_auf_knauss(webseite)
elif webseite == "Schildershop24":
self.suche_auf_schildershop24(webseite)
elif webseite == "Häfele":
self.suche_auf_haefele(webseite)
elif webseite == "Esmeyer":
self.suche_auf_esmeyer(webseite)
elif webseite == "Papstar":
self.suche_auf_papstar(webseite)
elif webseite == "Lusini":
self.suche_auf_lusini(webseite)
elif webseite == "Hygi":
self.suche_auf_hygi(webseite)
elif webseite == "Schafferer":
self.suche_auf_schafferer(webseite)
elif webseite == "Gastronomie Kaufhaus":
self.suche_auf_gastronomie_kaufhaus(webseite)
elif webseite == "Böttcher":
self.suche_auf_boettcher(webseite)
elif webseite == "Büroshop24":
self.suche_auf_bueroshop24(webseite)
except Exception as e:
print(f"Fehler bei der Suche auf {webseite}: {e}")
def suche_auf_haefele(self,webseite):
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.haefele.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "inputSearchTerm")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "inputSearchTerm")
search_box.clear()
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_brewes(self, webseite):
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.brewes.de/catalogsearch/result?q='
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "search")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "search")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_conrad(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# Such-URL generieren
search_url = f'https://www.conrad.de/search.html?search={self.search_term}'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "header-search")))
# Suchbegriff eingeben
search_box = driver.find_element(By.ID, "header-search")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken
search_box.send_keys(Keys.RETURN)
def suche_auf_contorion(self, webseite):
driver = self.drivers[webseite]
# Such-URL generieren
search_url = f'https://www.contorion.de/suche?q={self.search_term}'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "search-input")))
# Suchbegriff eingeben
search_box = driver.find_element(By.ID, "search-input")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken
search_box.send_keys(Keys.RETURN)
def suche_auf_reichelt(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# Such-URL generieren
search_url = f'https://www.reichelt.de/index.html?ACTION=446&LA=446&nbc=1&SEARCH={self.search_term}'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 60).until(EC.presence_of_element_located((By.ID, "searchbutton")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "quicksearch_new")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Formular abschicken
search_button = self.driver.find_element(By.ID, "searchbutton")
search_button.click()
def suche_auf_kosatec(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# Such-URL generieren
search_url = f'https://www.kosatec.de/suche?query={self.search_term}'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='search']")))
# Suchbegriff eingeben
search_box = driver.find_element(By.CSS_SELECTOR, "input[name='search']")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken
search_box.send_keys(Keys.RETURN)
def suche_auf_hornbach(self,webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.hornbach.de/s/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, "global_search")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.NAME, "global_search")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_gastroteileshop(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = f'https://www.gastroteileshop.de/?s={self.search_term}'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
# Warten, bis das Suchfeld sichtbar ist
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "search-input")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.CLASS_NAME, "search-input")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_tiroled(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.tiroled.de/de/search'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, "search")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.NAME, "search")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_megabad(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.megabad.com/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "search-bar-input")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "search-bar-input")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_ips_shop(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.ips-kts.com/de/suche?query='
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, "query")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.NAME, "query")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
search_box.send_keys(Keys.RETURN)
def suche_auf_delker(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.delker2business.com/nw2017/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, "query")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.NAME, "query")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_knauss(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.knauss.info/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, "sSearch")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.NAME, "sSearch")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_schildershop24(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.schildershop24.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "search-query-stichwort_neu")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "search-query-stichwort_neu")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_esmeyer(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.esmeyer-shop.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, "search")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.NAME, "search")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_papstar(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.papstar-shop.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "search")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "search")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_lusini(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.lusini.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "search-input")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "search-input")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_hygi(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.hygi.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "qbox")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "qbox")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_schafferer(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.schafferer.de/gastro/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "quicksearch-input")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "quicksearch-input")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten
search_box.send_keys(Keys.RETURN)
def suche_auf_gastronomie_kaufhaus(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.gastronomie-kaufhaus.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "searchParam")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "searchParam")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten (falls notwendig)
search_box.send_keys(Keys.RETURN)
def suche_auf_boettcher(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.bueromarkt-ag.de/' # Bitte die tatsächliche URL einsetzen
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "autocomplete-0-input")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "autocomplete-0-input")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten (falls notwendig)
search_box.send_keys(Keys.RETURN)
def suche_auf_bueroshop24(self, webseite):
# WebDriver initialisieren
driver = self.drivers[webseite]
# URL für die Suche generieren
search_url = 'https://www.bueroshop24.de/'
# Webseite aufrufen und auf das vollständige Laden warten
driver.get(search_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "searchTerm")))
# Suchbegriff eingeben und Suche starten
search_box = driver.find_element(By.ID, "searchTerm")
search_box.clear() # Falls vorheriger Text vorhanden ist
search_box.send_keys(self.search_term)
# Enter-Taste drücken, um die Suche zu starten (falls notwendig)
search_box.send_keys(Keys.RETURN)
class WebseitenSucheApp(QWidget):
def __init__(self):
super().__init__()
self.selected_webpages = {
"Reichelt": False,
"Conrad": False,
"Kosatec": False,
"Hornbach": False,
"Contorion": False,
"Gastro Teile Shop": False,
"TiroLED": False,
"Megabad": False,
"IPS": False,
"Brewes": False,
"Delker": False,
"Knauss": False,
"Schildershop24": False,
"Häfele": False,
"Esmeyer": False,
"Papstar": False,
"Lusini": False,
"Hygi": False,
"Schafferer": False,
"Gastronomie Kaufhaus": False,
"Böttcher": False,
"Büroshop24": False
}
self.search_term = ""
self.driver = None
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
# Webseiten-Auswahl
layout.addWidget(QLabel("Wählen Sie die Webseiten aus:"))
for webpage, checked in self.selected_webpages.items():
checkbox = QCheckBox(webpage)
checkbox.setChecked(checked)
checkbox.stateChanged.connect(lambda state, page=webpage: self.update_webpage_state(page, state))
layout.addWidget(checkbox)
# Suchbegriff-Eingabe
search_label = QLabel("Geben Sie den Suchbegriff ein:")
layout.addWidget(search_label)
search_input = QLineEdit(self)
search_input.textChanged.connect(self.update_search_term)
layout.addWidget(search_input)
# Suchen-Button
search_button = QPushButton("Suche starten", self)
search_button.clicked.connect(self.start_search)
layout.addWidget(search_button)
# Neues Suche-Button
new_search_button = QPushButton("Neue Suche starten", self)
new_search_button.clicked.connect(self.new_search)
layout.addWidget(new_search_button)
self.setLayout(layout)
self.setGeometry(300, 300, 400, 200)
self.setWindowTitle('Webseiten Suche App')
self.show()
def update_webpage_state(self, webpage, state):
self.selected_webpages[webpage] = state == 2 # 2 entspricht dem Zustand "ausgewählt"
def update_search_term(self, text):
self.search_term = text
def start_search(self):
selected_webpages = [webpage for webpage, selected in self.selected_webpages.items() if selected]
if not selected_webpages:
QMessageBox.warning(self, "Fehler", "Bitte wählen Sie mindestens eine Webseite aus.")
return
if not self.search_term:
QMessageBox.warning(self, "Fehler", "Bitte geben Sie einen Suchbegriff ein.")
return
# Starte den Such-Thread
self.search_thread = SearchThread(selected_webpages, self.search_term)
self.search_thread.search_finished.connect(self.search_finished)
self.search_thread.start()
def search_finished(self):
self.search_thread.quit()
self.search_thread.wait()
if hasattr(self.search_thread, 'error_message') and self.search_thread.error_message:
QMessageBox.critical(None, "Fehler", self.search_thread.error_message)
else:
QMessageBox.information(None, "Suche abgeschlossen", "Die Suche wurde erfolgreich abgeschlossen.")
def new_search(self):
self.close_driver()
print("Neue Suche starten")
if __name__ == '__main__':
app = QApplication(sys.argv)
window = WebseitenSucheApp()
sys.exit(app.exec_())