m-chrzan.xyz
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagdalena GrodziƄska <mag.grodzinska@gmail.com>2019-11-24 14:15:45 +0100
committerGitHub <noreply@github.com>2019-11-24 14:15:45 +0100
commitd8c3798d068aa2e128ca7a43a9451ce6d87cc230 (patch)
tree3dfb19b1c834da6e3fd2d0c8fbb18d9fdbd28782
parenta1d9f108cf87cc1164343d79bd982ac89cdb59fc (diff)
parent3b53e5496e9fe9917db7825ceea1455cca75c155 (diff)
Merge pull request #32 from m-chrzan/fix_fetcher_2
Fix fetcher 2
-rw-r--r--src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java131
-rw-r--r--src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini1
-rw-r--r--src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py151
3 files changed, 166 insertions, 117 deletions
diff --git a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
index f7fdab2..1d4a2ce 100644
--- a/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
+++ b/src/main/java/pl/edu/mimuw/cloudatlas/fetcher/Fetcher.java
@@ -2,74 +2,129 @@ package pl.edu.mimuw.cloudatlas.fetcher;
import pl.edu.mimuw.cloudatlas.api.Api;
+import java.rmi.NotBoundException;
+import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.io.*;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import com.google.gson.Gson;
+import pl.edu.mimuw.cloudatlas.model.*;
public class Fetcher {
+ private static final List<String> fetcherAttributeNames = Arrays.asList(
+ "avg_load",
+ "free_disk",
+ "total_disk",
+ "free_ram",
+ "total_ram",
+ "free_swap",
+ "total_swap",
+ "num_processes",
+ "num_cores",
+ "kernel_ver",
+ "logged_users",
+ "dns_names"
+ );
+
+ private static final List<Type.PrimaryType> fetcherAttributeTypes = Arrays.asList(
+ Type.PrimaryType.DOUBLE,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.STRING,
+ Type.PrimaryType.INT,
+ Type.PrimaryType.LIST
+ );
+
+ private static Api api;
+ private static Process pythonProcess;
+
+ private static Value packAttributeValue(Object rawValue, Type.PrimaryType valueType) {
+ Value val = null;
+ ArrayList<Value> contacts = new ArrayList<Value>();
+
+ if (valueType.equals(Type.PrimaryType.STRING)) {
+ val = new ValueString((String) rawValue);
+ } else if (valueType.equals(Type.PrimaryType.INT)) {
+ val = new ValueInt(((Double) rawValue).longValue());
+ } else if (valueType.equals(Type.PrimaryType.DOUBLE)) {
+ val = new ValueDouble((Double) rawValue);
+ } else if (valueType.equals(Type.PrimaryType.LIST)) {
+ for (Object c : (ArrayList) rawValue) {
+ contacts.add(new ValueString((String) c));
+ }
+ val = new ValueList(contacts, TypePrimitive.STRING);
+ } else {
+ throw new UnsupportedOperationException();
+ }
-// private static String processAttribs(String jsonAttribs) {
-// Serializer serializer = new Serializer();
-// return
-// }
+ return val;
+ }
+
+ private static void initializeApiStub() throws RemoteException, NotBoundException {
+ Registry registry = LocateRegistry.getRegistry("localhost");
+ api = (Api) registry.lookup("Api");
+ System.out.println("Fetcher runs with registry");
+ }
- // https://jj09.net/interprocess-communication-python-java/
- private static void fetchData() {
- try {
- Registry registry = LocateRegistry.getRegistry("localhost");
- Api stub = (Api) registry.lookup("Api");
- System.out.println("Fetcher runs with registry"); // TODO
+ private static void initializePythonProcess() throws IOException {
+ String pythonScript = Fetcher.class.getResource("data_fetcher.py").getFile();
+ String pythonCmd = "/usr/bin/python3 " + pythonScript;
+ System.out.println("Run cmd: " + pythonCmd);
+ pythonProcess = Runtime.getRuntime().exec(pythonCmd);
+ }
- String pythonScript = Fetcher.class.getResource("data_fetcher.py").getFile();
- String pythonCmd = "/usr/bin/python3 " + pythonScript;
- System.out.println("cmd: " + pythonCmd);
- Process p = Runtime.getRuntime().exec(pythonCmd);
- BufferedReader bufferRead = new BufferedReader( new InputStreamReader(p.getInputStream()));
- BufferedReader errorRead = new BufferedReader( new InputStreamReader(p.getErrorStream()));
+ private static ArrayList deserializeAttribs(String serializedAttribs) {
+ Gson g = new Gson();
+ return g.fromJson(serializedAttribs, ArrayList.class);
+ }
+ // https://jj09.net/interprocess-communication-python-java/
+ private static void fetchData() {
+ BufferedReader bufferRead;
+ ArrayList deserializedAttribs;
+ String jsonAttribs;
- System.out.println("Gonna read some attribs");
- String jsonAttribs = bufferRead.readLine();
- String serializedAttribs;
+ System.out.println(System.getProperty("user.dir"));
- System.out.println("Read some attribs");
- System.out.println(jsonAttribs);
- System.out.println("Got some attribs");
+ try {
+ initializeApiStub();
+ initializePythonProcess();
- ArrayList aa = deserializeAttribs(jsonAttribs);
- System.out.println(aa);
+ bufferRead = new BufferedReader( new InputStreamReader(pythonProcess.getInputStream()));
- // TODO different condition
- while(!jsonAttribs.equals("x")) {
+ while((jsonAttribs = bufferRead.readLine()) != null) {
System.out.println(jsonAttribs);
System.out.flush();
- serializedAttribs = "1";
- // stub.setAttributeValue(serializedAttribs);
- jsonAttribs = bufferRead.readLine();
+ deserializedAttribs = deserializeAttribs(jsonAttribs);
+ for (int i = 0; i < fetcherAttributeNames.size(); i++) {
+ api.setAttributeValue(
+ "/",
+ fetcherAttributeNames.get(i),
+ packAttributeValue(
+ deserializedAttribs.get(i),
+ fetcherAttributeTypes.get(i)));
+ }
}
bufferRead.close();
- } catch(IOException e) {
- e.printStackTrace();
} catch (Exception e) {
System.err.println("Fetcher exception:");
e.printStackTrace();
}
}
-
public static void main(String[] args) {
fetchData();
}
-
- public static ArrayList deserializeAttribs(String serializedAttribs) {
- Gson g = new Gson();
-
- return g.fromJson(serializedAttribs, ArrayList.class);
- }
}
diff --git a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini
index 14af0af..56b59c1 100644
--- a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini
+++ b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini
@@ -1,5 +1,6 @@
[AttributeParams]
+InitialDelay=0
CollectionInterval=5
AveragingPeriod=5
AveragingMethod=arithmetic \ No newline at end of file
diff --git a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py
index d69c498..67b2eb4 100644
--- a/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py
+++ b/src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/data_fetcher.py
@@ -1,22 +1,21 @@
import json
-import subprocess
import sys
import time
-
-import schedule
import psutil
import platform
import socket
import configparser
-# import sqlite3
+import statistics
+import urllib.request
# Pipe
pipe = None
# Attribute params
-collectionInterval = 1
-averagingPeriod = 1
-averagingMethod = ""
+initial_delay = 0
+collection_interval = 1
+averaging_period = 1
+averaging_method = ""
"""
Attributes
@@ -37,31 +36,23 @@ averagingMethod = ""
"""
-def setup_database():
- conn = sqlite3.connect('attributes.db')
- c = conn.cursor()
- c.execute('''CREATE TABLE attrib
- (timestamp integer,
- avg_cpu_load integer,
- free_disk integer,
- total_disk integer,
- free_ram integer,
- total_ram integer,
- free_swap integer,
- total_swap integer,
- num_processes integer,
- num_cores integer,
- kernel_ver text,
- logged_users integer,
- dns_names text)
- ''')
- # TODO format dns_names
- conn.commit()
- conn.close()
+def get_avg_load():
+ load_percentages = psutil.cpu_percent(interval=averaging_period, percpu=True)
+
+ if averaging_method == "arithmetic":
+ return statistics.mean(load_percentages)
+ elif averaging_method == "geometric":
+ return statistics.geometric_mean(load_percentages)
+ elif averaging_method == "harmonic":
+ return statistics.harmonic_mean(load_percentages)
+ elif averaging_method == "median":
+ return statistics.median(load_percentages)
+ else:
+ raise RuntimeError("Avg CPU load error")
def get_data():
- avg_cpu_load = psutil.cpu_percent(interval=1) # TODO better?
+ avg_load = get_avg_load()
free_disk = psutil.disk_usage("/").free
total_disk = psutil.disk_usage("/").total
free_ram = psutil.virtual_memory().available
@@ -70,67 +61,69 @@ def get_data():
total_swap = psutil.swap_memory().total
num_processes = len(psutil.pids())
num_cores = psutil.cpu_count(False)
- kernel_ver = platform.release() # TODO ew. version()
+ kernel_ver = platform.release()
logged_users = len(psutil.users())
- hostname = socket.gethostbyaddr("127.0.0.1") # TODO czy dziala
+ external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8')
+ hostname = socket.gethostbyaddr(external_ip)
dns_names = ([hostname[0]] + hostname[1])[:3]
- # https://stackoverflow.com/questions/2575760/python-lookup-hostname-from-ip-with-1-second-timeout
sys.stdout.write("[{},{},{},{},{},{},{},{},{},{},{},{}]\n".format(
- #print("[{},{},{},{},{},{},{},{},{},{},{},{}]\r\n".format(
- avg_cpu_load,
- free_disk,
- total_disk,
- free_ram,
- total_ram,
- free_swap,
- total_swap,
- num_processes,
- num_cores,
- kernel_ver,
- logged_users,
- json.dumps(dns_names))) #.encode()) # TODO ten string
+ avg_load,
+ free_disk,
+ total_disk,
+ free_ram,
+ total_ram,
+ free_swap,
+ total_swap,
+ num_processes,
+ num_cores,
+ kernel_ver,
+ logged_users,
+ json.dumps(dns_names))
+ )
+
sys.stdout.flush()
- # TODO error control and pipe restart
-"""
- conn = sqlite3.connect('attributes.db')
- c = conn.cursor()
- c.execute("INSERT INTO attrib VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )",
- avg_cpu_load,
- free_disk,
- total_disk,
- free_ram,
- total_ram,
- free_swap,
- total_swap,
- num_processes,
- num_cores,
- kernel_ver,
- logged_users,
- dns_names.__str__()) # TODO ten string
- conn.commit()
- conn.close()
-"""
-# TODO funkcja do usuwania zbendych
-def remove_historical_data():
- pass
+def check_averaging_method(averaging_method):
+ averaging_methods = ["arithmetic", "geometric", "harmonic", "mean"]
+ return averaging_method and averaging_method in averaging_methods
-if __name__ == '__main__':
- # config = configparser.ConfigParser()
- # config.read("config.ini")
- collectionInterval = int(5) # int(config["AttributeParams"]["collectionInterval"])
- averagingPeriod = int(5) # int(config["AttributeParams"]["averagingPeriod"])
- averagingMethod = "arithmetic" # config["AttributeParams"]["averagingMethod"]
+def read_config():
+ global initial_delay
+ global collection_interval
+ global averaging_period
+ global averaging_method
+ config = configparser.ConfigParser()
- # setup_database()
+ try:
+ # check if running from source code dir
+ config.read("config.ini")
+ except KeyError:
+ pass
+ else:
+ # we assume that it's running as subprocess from Fetcher.java
+ # we assume working dir to be CloudAtlas root
+ # because gradle seems to put it this way
+ config.read("src/main/resources/pl/edu/mimuw/cloudatlas/fetcher/config.ini")
- # TODO some condition for this?
+ initial_delay = int(config["AttributeParams"]["initialDelay"])
+ collection_interval = int(config["AttributeParams"]["collectionInterval"])
+ averaging_period = int(config["AttributeParams"]["averagingPeriod"])
+ averaging_method = config["AttributeParams"]["averagingMethod"]
+
+ if not check_averaging_method(averaging_method):
+ raise ValueError("Incorrect averaging method")
+ elif collection_interval < averaging_period:
+ raise ValueError("Collection interval smaller than averaging period")
+
+
+if __name__ == '__main__':
+ read_config()
+
+ time.sleep(initial_delay)
while True:
get_data()
- time.sleep(collectionInterval)
-
- # schedule.every(collectionInterval).seconds.do(get_data)
+ time.sleep(collection_interval - averaging_period)