Pobranie dostępnych ilości produktu w danym magazynie
Załóżmy ze nasz sklep internetowy/dział handlowy potzrebuje DOSTĘPNYCH ilości produktów.
W WMS EasyStorage mamy następująca logike:
Magazyn (identyfikowamy po warehouseid) oraz zamówienia, które są realzowane z danego magazynu.
Aby zrealizować dane zamówienie WMS generuje zadania operatorów ("Zbierz" i "uzupełnij"), oraz raportuje braki produktów. Przez API możemy pobrać dwie dane:
Stan produktów w danym magazynie
Liczbę produktów niezbędnych do zrezliowania zamówień z magazynu
Rożnica pomiedzy tymi dwoma wartościami jest to liczba dostępnych/wolnych produktów
Przykład: W magazynie mamy 3 sztuki towaru (na półkach). Jedno zamówienie (Z1) z 1 szt jest już skompleowane (produkt jest w koszyku), a drugie (Z2) na 2szt jest w statusie zaalkowane (czeka na zebranie). Zamówienie Z1 ponieważ zostało już zebrane, WMS to zamówienie ignoruje i końcowy stan produktów w magazynie jest 3. Liczba produktów niezbędnych do realizacji to 2 szt (zam Z2). Liczba dostępnych produktów po obliczeniu różnicy: (3 -2)=1. Gdy Z2 będzie na 6szt. to liczba dostępnych produktów wyniesie (3-6)=-3 (czyli brakuje 3 sztuk w magazynie do realizacji zamówień)
Poniżej przykład skryptu (Python3) pobierający produkty oraz ich stany, używający 3 endpointów:
fetch_inventory_data(): Pobieranie stanu magazynowego (system zwraca nam ID produktu oraz stan), uzywany endpoint: /api/external/v2/inventory/getinventorysummaryperwarehouse z filtrem na ID magazynu
fetch_product_data(): Pobieranie i uzupełnianie brakujących danych o produktach - majac ID produktu, "dociągamy" dane o produktach takie jak SKU, Nazwa, EAN (uzywany enpoint: /api/external/v2/stock/product/getproducts)
load_local_product_data() i save_local_product_data(): Zarządzanie lokalnym przechowywaniem danych o produktach - pamietajać ze API ma limit (100 zapytań na minutę, zachowujemy dane o produktach w pliku)
save_to_csv() i save_to_xlsx(): Zapisywanie pobranych danych do plików w formatach CSV i XLSX. W tych funkcjach obliczamy dostępne ilości danych produktów.
Skrypt obsługuje paging (API zwraca maksymlanie 1000 pozycji na stronie dla stanu magazynu, czyli endpoint /api/external/v2/inventory/getinventorysummaryperwarehouse oraz 100 pozycji przy pobieraniu danych o produktach, endpoint: /api/external/v2/stock/product/getproducts
Uruchamianie
Aby uruchomić skrypt, upewnij się, że wszystkie wymagane biblioteki są zainstalowane, a następnie wykonaj następującą komendę:
python nazwapliku.py
Upewnij się również, że zmienne konfiguracyjne, takie jak BASE_URL, LOGIN, PASSWORD, i WAREHOUSE_ID, są poprawnie ustawione.
Nazwy Magazynów w ERP i WMS mogą się różnić, aby pobrać ID magazynów dostępne w WMS należy użyć endpointa: /api/external/v2/warehouse/getwarehouses (nie używany w poniższym skrypcie, ID zostało wpisane do zmiennych na początku skryptu)
import os
import requests
import csv
import time
import json
from openpyxl import Workbook
# Zmienne konfiguracyjne
BASE_URL = "https://tenantfirmy.easystorage.io"
LOGIN = "usernameapi"
PASSWORD = "PasswordToType"
WAREHOUSE_ID = "137c5cb3-d9c5-40f5-2bff-08da76af3868"
def get_token():
url = f"{BASE_URL}/api/external/v2/account/gettoken"
payload = {
'username': LOGIN,
'password': PASSWORD,
}
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()['data']['user']['accessToken']
def fetch_inventory_data(token, warehouse_id):
url = f"{BASE_URL}/api/external/v2/inventory/getinventorysummaryperwarehouse"
headers = {'Authorization': f'Bearer {token}'}
offset = 0
all_items = []
while True:
payload = {
'offset': offset,
'limit': 1000,
'filter': {
'warehouse': {'id': warehouse_id},
},
'paging': {
'page': offset // 1000 + 1,
'rowsPerPage': 1000,
}
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
inventory = response.json().get("data", {}).get("inventory", [])
if not inventory:
break
print("stan magazynu - produkty od", offset+1," do ", offset+1000)
all_items.extend(inventory)
offset += 1000
return all_items
def load_local_product_data(file_path):
"""Wczytuje dane o produktach z pliku JSON."""
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_local_product_data(file_path, product_map):
"""Zapisuje dane o produktach do pliku JSON."""
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(product_map, f, ensure_ascii=False, indent=4)
def fetch_product_data(token, product_ids, local_data_file='product_data.json'):
"""Pobiera dane produktów, korzystając z lokalnych danych, jeśli dostępne."""
# Wczytujemy dane o produktach z lokalnego pliku
local_data = load_local_product_data(local_data_file)
batch_size = 100
total_products = len(product_ids)
product_map = local_data # Zaczynamy od danych lokalnych
# Sprawdzamy, które produkty trzeba pobrać
missing_product_ids = [pid for pid in product_ids if pid not in product_map]
if missing_product_ids:
print(f"Pobieranie danych dla {len(missing_product_ids)} brakujących produktów.")
for i in range(0, len(missing_product_ids), batch_size):
if i%100 == 0:
print ("dane produkty od",i+1,"do",i+batch_size)
batch = missing_product_ids[i:i + batch_size]
payload = {
"paging": {
"page": 1,
"rowsPerPage": batch_size
},
"filter": {
"ids": batch
}
}
headers = {'Authorization': f'Bearer {token}'}
try:
response = requests.post(f"{BASE_URL}/api/external/v2/stock/product/getproducts", json=payload, headers=headers)
response.raise_for_status()
response_data = response.json()
# Dodajemy produkty do mapy
for product in response_data.get('data', {}).get('products', []):
product_map[product['id']] = product
except requests.exceptions.RequestException as e:
print(f"Błąd podczas pobierania produktów: {str(e)}")
continue
time.sleep(1) # Dodajemy opóźnienie, aby nie przekroczyć limitów API
# Zapisujemy zaktualizowane dane lokalnie
save_local_product_data(local_data_file, product_map)
return product_map
def save_to_csv(items, product_details):
with open('inventory_data.csv', 'w', newline='',encoding='utf-8') as csvfile:
fieldnames = ['product_id', 'code', 'barcode', 'name', 'quantityInAllZones', 'quantityToPick', 'quantity_aval']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in items:
product_id = item.get('product', {}).get('id', '')
product_info = product_details.get(product_id, {})
quantity_all = item.get('quantityInAllZones', 0)
quantity_pick = item.get('quantityToPick', 0)
quantity_aval = quantity_all - quantity_pick
writer.writerow({
'product_id': product_id,
'code': product_info.get('code', ''),
'barcode': product_info.get('barcode', ''),
'name': product_info.get('name', ''),
'quantityInAllZones': quantity_all,
'quantityToPick': quantity_pick,
'quantity_aval': quantity_aval,
})
print("Dane zapisane do pliku inventory_data.csv")
def save_to_xlsx(items, product_details):
wb = Workbook()
ws = wb.active
ws.title = "Inventory Data"
headers = ['product_id', 'code', 'barcode', 'name', 'quantityInAllZones', 'quantityToPick', 'quantity_aval']
ws.append(headers)
for item in items:
product_id = item.get('product', {}).get('id', '')
product_info = product_details.get(product_id, {})
quantity_all = item.get('quantityInAllZones', 0)
quantity_pick = item.get('quantityToPick', 0)
quantity_aval = quantity_all - quantity_pick
ws.append([
product_id,
product_info.get('code', ''),
product_info.get('barcode', ''),
product_info.get('name', ''),
quantity_all,
quantity_pick,
quantity_aval
])
wb.save("inventory_data.xlsx")
print("Dane zapisane do pliku inventory_data.xlsx")
def main():
token = get_token()
items = fetch_inventory_data(token, WAREHOUSE_ID)
product_ids = [item['product']['id'] for item in items if 'product' in item]
product_details = fetch_product_data(token, product_ids)
save_to_csv(items, product_details)
save_to_xlsx(items, product_details)
if __name__ == '__main__':
main()