From c0e566e1e66ab27892c33a0378416b8cfdef1705 Mon Sep 17 00:00:00 2001 From: Martin Delcourt <martin.delcourt@uclouvain.be> Date: Thu, 7 Nov 2019 14:06:46 +0100 Subject: [PATCH] Adding Ch8 solutions and histogram utility --- Chapter_8/buffer.py | 60 +++++++++++ Chapter_8/exam_simul.py | 69 ++++++++++++ Chapter_8/jeu_2048.py | 216 ++++++++++++++++++++++++++++++++++++++ Chapter_8/martingale.py | 52 +++++++++ Chapter_8/tp_lphys1201.py | 155 +++++++++++++++++++++++++++ setup.sh | 1 + utilities/histogram.py | 156 +++++++++++++++++++++++++++ 7 files changed, 709 insertions(+) create mode 100644 Chapter_8/buffer.py create mode 100644 Chapter_8/exam_simul.py create mode 100644 Chapter_8/jeu_2048.py create mode 100644 Chapter_8/martingale.py create mode 100644 Chapter_8/tp_lphys1201.py create mode 100755 setup.sh create mode 100644 utilities/histogram.py diff --git a/Chapter_8/buffer.py b/Chapter_8/buffer.py new file mode 100644 index 0000000..497d684 --- /dev/null +++ b/Chapter_8/buffer.py @@ -0,0 +1,60 @@ +from random import random + +class detector: + def __init__(self, depth, read_time = 8): + self.events = [] + self.depth = depth + self.read_time = read_time + + + def read_buffer(self,time): + if len(self.events) > 0 and self.events[0] < time: + self.events = self.events[1:] + return True + else: + return False + + def add_event(self,time): + # First, clear any event that could be read + while(self.read_buffer(time)): + pass + #self.read_buffer(time) + + #Then, try to add event at the end of queue + if len(self.events) < self.depth: + if len(self.events) == 0: + self.events.append(time+self.read_time-1) + else: + self.events.append(self.events[-1]+self.read_time) + + return True + else: + return False + + + +def get_eff(proba, depth, read_time = 8, total_time = 1e5): + my_det = detector(depth, read_time) + n_accept = 0 + n_total = 0 + for time in range(int(total_time)): + if random() < proba: + n_total += 1 + if my_det.add_event(time): + n_accept += 1 + return n_accept/n_total + +def scan_eff(proba, threshold = 0.98, read_time = 8): + queue_depth = 1 + detector_eff = get_eff(proba,1, read_time) + print("If no queue and p = {}% : eff = {:3f}%".format(100*proba, 100*detector_eff)) + while (detector_eff < threshold and queue_depth < 50): + queue_depth += 1 + detector_eff = get_eff(proba,queue_depth, read_time) + print("If {} events queue and p = {} % : eff = {:3f}%".format(queue_depth,100*proba,100*detector_eff)) + + +scan_eff(0.1) +scan_eff(0.125) +scan_eff(0.05) +scan_eff(1) diff --git a/Chapter_8/exam_simul.py b/Chapter_8/exam_simul.py new file mode 100644 index 0000000..c65e1bc --- /dev/null +++ b/Chapter_8/exam_simul.py @@ -0,0 +1,69 @@ +from histogram import Histogram + + +def simul_one_year(source, n_students): + new_hist = Histogram(21,-0.5,20.5) + new_hist.fill_random(source,n_students) + return new_hist + + +def get_success_rate(histogram): + return histogram.get_integral(9,20)*1./histogram.get_integral() + +results_career = Histogram() +results_career.load("results_career.csv") + +results_one_year = Histogram() +results_one_year.load("results_one_year.csv") + + +# First question : +one_year_av = results_one_year.get_average() +one_year_sr = get_success_rate(results_one_year) + +n_worse_av = 0 +n_worse_sr = 0 +n_tries = 5000 +for i in range(n_tries): + ss = simul_one_year(results_career,30) + av = ss.get_average() + sr = get_success_rate(ss) + + if av < one_year_av: + n_worse_av +=1 + if sr < one_year_sr: + n_worse_sr +=1 + +print("Worse average in {} % of simulations.".format(100*n_worse_av*1./n_tries)) +print("Worse success rate in {} % of simulations.".format(100*n_worse_sr*1./n_tries)) + + + +# Second question : + +n_worse_career_av = 0 +n_worse_career_sr = 0 +n_tries = 1000 +for test in range(n_tries): + career_worse_av = 20 + career_worse_sr = 1 + for year in range(30): + ss = simul_one_year(results_career,30) + av = ss.get_average() + sr = get_success_rate(ss) + if av < career_worse_av: + career_worse_av = av + if sr < career_worse_sr: + career_worse_sr = sr + + if career_worse_av < one_year_av: + n_worse_career_av += 1 + + if career_worse_sr < one_year_sr: + n_worse_career_sr += 1 + + +print("Worse average in {} % of careers.".format(100*n_worse_career_av*1./n_tries)) +print("Worse success rate in {} % of careers.".format(100*n_worse_career_sr*1./n_tries)) + + diff --git a/Chapter_8/jeu_2048.py b/Chapter_8/jeu_2048.py new file mode 100644 index 0000000..421c26f --- /dev/null +++ b/Chapter_8/jeu_2048.py @@ -0,0 +1,216 @@ +import random +import copy +from curtsies import Input + +class tile: + def __init__(self,x = 0, y = 0, value = 0): + self.x = x + self.y = y + self.value = value + + def __eq__(self,other): + if isinstance(other,tile): + return other.x == self.x and other.y == self.y + else: + try: + return other[0] == self.x and other[1] == self.y + except: + return NotImplemented + def __repr__(self): + return("({},{}) : {} ".format(self.x,self.y,self.value)) + +class grid: + def __init__(self, size = 4, input_method = ""): + self._tiles = [] + self.size = size + self.add_new_tile() + self.add_new_tile() + self.input_method = input_method + if self.input_method == "": + self.input_method = "keyboard" + + + def score(self): + ss = 0 + for t in self._tiles: + ss += t.value + return ss + + def get_random(self): + non_empty = [] + for i in range(self.size): + for j in range(self.size): + if not (i,j) in self._tiles: + non_empty.append(tile(i,j)) + if non_empty == []: + print("Grid filled...") + return tile(-1,-1) + return random.choice(non_empty) + + def add_new_tile(self): + new_tile = self.get_random() + if new_tile.x < 0: + return -1 + if random.random() > 0.9: + new_tile.value = 4 + else: + new_tile.value = 2 + self._tiles.append(new_tile) + return 1 + + def __repr__(self): + txt = "" + for line in range(self.size): + txt += ((7*self.size+1)*"-"+"\n") + for column in range(self.size): + txt+="|" + found_tile = False + for t in self._tiles: + if t == (line,column): + txt+=str(t.value).center(6) + found_tile = True + if not found_tile: + txt+=6*" " + txt+="|\n" + + txt += ((7*self.size+1)*"-"+"\n") + return txt + + + def get_lines(self,direction): + lines = [] + for line in range(self.size): + line_tiles = [] + for column in range(self.size): + for t in self._tiles: + if t == (line,column): + line_tiles.append(t) + + line_tiles=sorted(line_tiles,key=lambda t : direction*t.y) + lines.append(line_tiles) + return lines + + def get_columns(self,direction): + columns = [] + for column in range(self.size): + col_tiles = [] + for line in range(self.size): + for t in self._tiles: + if t == (line,column): + col_tiles.append(t) + + col_tiles=sorted(col_tiles,key=lambda t : direction*t.x) + columns.append(col_tiles) + return columns + + + def shift(self, collections, attr, direction, default): + has_change = False + for line in collections: + for (i,t) in enumerate(line): + if i == 0: + if getattr(t,attr) != default: + setattr(t,attr,default) + has_change = True + else: + if line[i-1].value == t.value: + line[i-1].value *= 2 + setattr(t,attr,getattr(line[i-1],attr)) + t.value = -1 + has_change = True + else: + if getattr(t,attr) != getattr(line[i-1],attr) + direction: + setattr(t,attr,getattr(line[i-1],attr) + direction) + has_change = True + for i in reversed(range(len((self._tiles)))): + if self._tiles[i].value < 0: + del self._tiles[i] + return has_change + + + def shift_left(self): + return self.shift(self.get_lines(+1), "y", +1, 0) + + def shift_right(self): + return self.shift(self.get_lines(-1), "y", -1, self.size-1) + + def shift_up(self): + return self.shift(self.get_columns(+1), "x", +1, 0) + + def shift_down(self): + return self.shift(self.get_columns(-1), "x", -1, self.size-1) + + + + def play(self, action = "", silent = False): + if not silent: + print(self) + + if action == "": + action = self.get_input() + if action == "esc": + return -1 + + has_change = False + if action == "z": + has_change = self.shift_up() + elif action == "s": + has_change = self.shift_down() + elif action == "q": + has_change = self.shift_left() + elif action == "d": + has_change = self.shift_right() + if not has_change: + move_possible = False + new_game = copy.deepcopy(self) + move_possible += new_game.shift_up() + new_game = copy.deepcopy(self) + move_possible += new_game.shift_down() + new_game = copy.deepcopy(self) + move_possible += new_game.shift_left() + new_game = copy.deepcopy(self) + move_possible += new_game.shift_right() + if not move_possible: + if not silent: + print("No new move possible ! You have lost the game !") + return -1 + else: + if not silent: + print("You are not blocked... Please try another move...") + return 0 + + if self.add_new_tile() < 0: + raise TypeError("Unable to fill the grid... This state shouldn't be reached...") + + return 1 + + def get_input(self): + if self.input_method == "keyboard": + ii = "" + with Input(keynames='curses') as input_generator: + for e in input_generator: + ii = str(e) + if ii in "zqsd": + return ii + elif e == "KEY_UP": + return "z" + elif e == "KEY_DOWN": + return "s" + elif e == "KEY_LEFT": + return "q" + elif e =="KEY_RIGHT": + return "d" + else: + if e=="\x1b": + return "esc" + else: + ii = input("Action ? ") + while not ii in ["z","s","q","d","esc"]: + ii = input("Action ?") + return ii + +if __name__=="__main__": + gg = grid(6) + while gg.play() > -1: + print("Score = {}".format(gg.score())) + diff --git a/Chapter_8/martingale.py b/Chapter_8/martingale.py new file mode 100644 index 0000000..b481988 --- /dev/null +++ b/Chapter_8/martingale.py @@ -0,0 +1,52 @@ +from random import random + + +class game: + def __init__(self, initial_stack = 1000): + self.initial_stack = initial_stack + self.money = initial_stack + self.toBet = 1 + self.finished = False + + def play(self): + win = random() < 18/37 + if win: + self.money += self.toBet + if self.money >= self.initial_stack + 50: + self.finished = True + else: + self.money -= self.toBet + self.toBet *=2 + if self.toBet > self.money: + self.finished = True + +def get_prob_winning(initial_stack, ntries = 100000): + nWin = 0 + money = 0 + + for i in range(ntries): + g = game(initial_stack) + while not g.finished: + g.play() + nWin += (g.money >= initial_stack+50) + money += g.money - initial_stack + + return (nWin/ntries,money/ntries) + + +ww = get_prob_winning(1000) +print("If you play with a 1000€ stack, you loose in {}% of the cases, leading to a {}€ average loss".format(100*(1-ww[0]),-ww[1])) + + +stack = 0 +threshold = .99 + +for i in reversed(range(0,16)): + stack += 1<<i + prob = get_prob_winning(stack)[0] + print("Stack = {} ---> prob = {} ".format(stack,prob)) + if prob >= threshold: + stack -= 1<<i + +print("Stack to have {}% chance of winning has to be {}... Suspiciouly close to 32768...".format(100*threshold,stack)) +print("You will therefore loose on average {} euros".format(-get_prob_winning(stack)[1])) diff --git a/Chapter_8/tp_lphys1201.py b/Chapter_8/tp_lphys1201.py new file mode 100644 index 0000000..f78fa5c --- /dev/null +++ b/Chapter_8/tp_lphys1201.py @@ -0,0 +1,155 @@ +import random, sys + + +class student: + uniqueId = 0 + + def __init__(self, average_question_time): + self.id = student.uniqueId + self.n_answered = 0 + student.uniqueId += 1 + + self.next_question_time = -1 + self.waiting_time = 0 + self.prob_question = 1/average_question_time + self.get_next_question(0) + + def get_next_question(self, time): + self.next_question_time = time + random.expovariate(self.prob_question) + + def __repr__(self): + return("Next question at : {}".format(self.next_question_time)) + + def __gt__(self,other): + return self.next_question_time > other.next_question_time + def __ge__(self,other): + return self.next_question_time >= other.next_question_time + def __lt__(self,other): + return self.next_question_time < other.next_question_time + def __le__(self,other): + return self.next_question_time <= other.next_question_time + +class teacher: + uniqueId = 0 + def __init__(self, alpha, beta): + self.n_answered = 0 + self.id = teacher.uniqueId + teacher.uniqueId += 1 + self.alpha = alpha + self.beta = beta + self.time_busy = 0 + self.total_working_time = 0 + + + def get_answer_time(self, time): + solve_time = time + random.weibullvariate(self.alpha,self.beta) + self.time_busy = solve_time + self.total_working_time += solve_time - time + return solve_time + + def is_busy(self,time): + return time < self.time_busy + + def __repr__(self): + return("Will be busy until {}".format(self.time_busy)) + + def __gt__(self,other): + return self.time_busy > other.time_busy + def __ge__(self,other): + return self.time_busy >= other.time_busy + def __lt__(self,other): + return self.time_busy < other.time_busy + def __le__(self,other): + return self.time_busy <= other.time_busy + + +def simulate(n_students, n_teachers, av_student = 45, alpha = 3, beta = 1.6): + teachers = [ teacher(alpha,beta) for i in range(n_teachers) ] + students = [ student(av_student) for i in range(n_students) ] + + time = 0 + while time < 120: + students.sort() + teachers.sort() + + for t in teachers: + if not t.is_busy(time): + s = students[0] + start_help = max(time,s.next_question_time) + s.waiting_time += start_help - s.next_question_time + + solve_time = t.get_answer_time(start_help) + s.get_next_question(solve_time) + + t.n_answered += 1 + s.n_answered +=1 + + break + + + teachers.sort() + time = teachers[0].time_busy + + average_waiting_time = 0 + average_questions_answered = 0 + for s in students: + average_waiting_time += s.waiting_time + average_questions_answered += s.n_answered + + average_time_worked = 0 + for t in teachers: + average_time_worked += t.total_working_time + + return (average_waiting_time/n_students, average_questions_answered/n_students, average_time_worked/n_teachers,time) + + +def simulate_many(n_simul,n_students, n_teachers, av_student = 45, alpha = 6, beta = 1.6, silent = False): + if not silent: + print(20*"*") + print(20*"*") + print("Simulating classroom with :") + print("{} students with a question every {} min on average".format(n_students,av_student)) + print("{} teachers answering question with alpha = {} and beta = {}".format(n_teachers,alpha,beta)) + print("These parameters result in a mode of {:0.2f}m per question".format(alpha*(1-1/beta)**(1/beta))) + data = [0,0,0,0] + for i in range(n_simul): + simul_results = simulate(n_students, n_teachers, av_student, alpha, beta) + data[0] += simul_results[0] + data[1] += simul_results[1] + data[2] += simul_results[2] + data[3] += simul_results[3] + + data = [x/n_simul for x in data] + if not silent: + print("After {} simulations, the following results were found:".format(n_simul)) + print("An average of {:0.2f} questions were answered after waiting {:0.2f} minutes.".format(data[1],data[0])) + print("The teachers were busy {:0.2f}% of the time".format(100*data[2]/data[3])) + print("A total of {:0.2f} questions were answered".format(data[1]*n_students)) + return data[0] + + +def sweep_alpha(n_students, wait_thresh, n_simul = 1000, n_teachers = 2, av_student = 45, beta = 1.6): + alpha_max = 16 + alpha = 0 + for i in range(10): + alpha_step = alpha_max/(1<<i) + alpha += alpha_step + + if simulate_many(n_simul, n_students, n_teachers, av_student, alpha, beta, silent = True) > wait_thresh: + alpha -= alpha_step + + print("alpha = {}".format(alpha)) + simulate_many(n_simul, n_students, n_teachers, av_student, alpha, beta) + + +simulate_many(10000 , 13 , 2 , 40 ) + +simulate_many(10000 , 26 , 2 , 40 ) + +sweep_alpha(26,2) + +sweep_alpha(26,1) + + + + diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..a49c441 --- /dev/null +++ b/setup.sh @@ -0,0 +1 @@ +export PYTHONPATH=`pwd`/utilities:$PYTHONPATH diff --git a/utilities/histogram.py b/utilities/histogram.py new file mode 100644 index 0000000..9f58662 --- /dev/null +++ b/utilities/histogram.py @@ -0,0 +1,156 @@ +import numpy as np +import matplotlib.pyplot as plt +import random, csv + +class Histogram: + def __init__(self, n_bins = 0, a = 0, b = 0): + """Crée un histograme de n_bins réparties uniformément entre a et b""" + self._data = [0 for i in range(n_bins)] + self._n_bins = n_bins + self._a = a + self._b = b + self._cached_integral = None + + def fill(self, x): + """Remplis la bin correspondant à la valeur 'x'""" + self._cached_integral = None + bin_number = int((x-self._a)*self._n_bins/(self._b-self._a)) + if bin_number >= 0 and bin_number < self._n_bins: + self._data[bin_number] += 1 + return bin_number + else: + print("Warning, {} out of bounds.".format(x)) + return -1 + + def get_maximum(self): + """Retourne le maximum de l'histogramme""" + maximum = self._data[0] + for x in self._data[1:]: + if x > maximum: + maximum = x + return maximum + + def get_bin_center(self,bin_id): + """Returns center of bin 'bin_id'""" + return self._a+(bin_id+0.5)*(self._b-self._a)/self._n_bins + + def draw(self): + """Affiche l'histogramme en console""" + y_max = self.get_maximum() + scale = 1. + if y_max > 50: + scale = 50*1./y_max + y_max = 50 + # vertical axis : 50 pxl + y = y_max + 1 + while y > 0: + to_print = "" + if (y == y_max+1): + to_print = "y ^" + else: + to_print = " |" + + for x in range(self._n_bins): + if self._data[x] >= y/scale: + to_print += "#" + else: + to_print += " " + print (to_print) + y-=1 + + print (" -"+"+"+(self._n_bins-1)*"-"+">") + print (" |"+(self._n_bins-1)*" "+"x") + if scale != 1: + print ("y-axis scale : 1 # = {}".format(1./scale)) + + def save(self, file_name): + """Save histogram as CSV file""" + with open(file_name,'w') as file: + writer = csv.writer(file) + writer.writerow(["bin id","bin center", "n entries"]) + bin_size = (self._b - self._a)*1./self._n_bins + for bin_id, bin_content in enumerate(self._data): + writer.writerow([bin_id, self._a+(bin_id+0.5)*bin_size, bin_content]) + + + def load(self, file_name): + """Read histogram from CSV file""" + self._data = [] + line_id = 0 + first_bin_center = 0 + second_bin_center = 0 + with open(file_name,'r') as file: + reader = csv.reader(file) + for row in reader: + line_id += 1 + if line_id == 1: + #Skipping line containing column titles + continue + else: + if line_id == 2: + first_bin_center = float(row[1]) + elif line_id == 3: + second_bin_center = float(row[1]) + + self._data.append(float(row[2])) + bin_width = second_bin_center - first_bin_center + self._a = first_bin_center-0.5*bin_width + self._b = self._a + len(self._data) * bin_width + self._n_bins = len(self._data) + + def print(self): + print(len(self._data)) + print(self._data) + + bin_size = (self._b - self._a)*1./self._n_bins + bins = [self._a + i*bin_size for i in range(len(self._data))] + print(bins) + plt.hist(bins[:],bins,weights = self._data) + plt.show() + + + def _compute_integral(self): + """Computes the integral of the histogram""" + self._cached_integral = [ 0 for i in self._data] + for index, value in enumerate(self._data): + if index == 0: + self._cached_integral[0] = value + else: + self._cached_integral[index] = self._cached_integral[index-1]+value + + def get_integral(self, a = -1, b = -1): + """Returns integral between bin ]a and b]""" + if self._cached_integral == None: + self._compute_integral() + if b < 0: + b = len(self._data)-1 + if a < 0: + return self._cached_integral[b] + else: + return self._cached_integral[b]-self._cached_integral[a] + + def get_random(self): + if self._cached_integral == None: + self._compute_integral() + + yy = random.random()*self._cached_integral[-1] + + for xx_bin, integ in enumerate(self._cached_integral): + + if integ >= yy: + return self.get_bin_center(xx_bin) + + TypeError("This state should never be reached.") + + + def fill_random(self, histo, n_entries): + """Fills this histogram n times following 'histo' distribution""" + for i in range(n_entries): + self.fill(histo.get_random()) + + def get_average(self): + """Returns average of histogram""" + av = 0 + for index, value in enumerate(self._data): + av += self.get_bin_center(index)*value + return av*1./self.get_integral() -- GitLab