feat: add backend for multiplayer support

This commit is contained in:
2025-07-29 01:32:02 -04:00
parent 3f1fe2c693
commit 8ea2d591a0
32 changed files with 300 additions and 3 deletions

129
frontend/.gitignore vendored Normal file
View File

@@ -0,0 +1,129 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

27
frontend/Dockerfile Normal file
View File

@@ -0,0 +1,27 @@
FROM python:3.12-slim
# Install build dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Set working directory
WORKDIR /app
# Copy project files
COPY . .
# Install PyYAML and websockets into __pypackages__/3.12/lib
RUN mkdir -p __pypackages__/3.12/lib && \
PYTHONPATH=__pypackages__/3.12/lib \
pip install --no-cache-dir --target=__pypackages__/3.12/lib \
PyYAML==6.0.2 websockets==15.0.1
# Build the web version
RUN pygbag --build main.py
# Expose the desired port
EXPOSE 3010
# Serve the build output
WORKDIR /app/build/web
CMD ["python", "-m", "http.server", "3010"]

101
frontend/data/config.yaml Normal file
View File

@@ -0,0 +1,101 @@
window:
width: 800
height: 600
title: "Tetri5"
sound:
theme-music-single: "data/sound/theme_music_single.ogg"
theme-music-multi: "data/sound/theme_music_multi.ogg"
option-change: "data/sound/option_change.ogg"
piece-rotate: "data/sound/piece_rotate.ogg"
piece-set: "data/sound/piece_set.ogg"
line-complete: "data/sound/line_complete.ogg"
four-lines-complete: "data/sound/four_lines_complete.ogg"
level-up: "data/sound/level_up.ogg"
game-over: "data/sound/game_over.ogg"
you-win: "data/sound/you_win.ogg"
image:
title-screen: "data/image/title_screen.png"
window-icon: "data/image/tetris_icon.png"
font: "data/image/press-start-2p-font.bmp"
position:
# title scene
title-logo: [240, 60]
option-one: [320, 390]
option-two: [320, 440]
cursor-option-one: [300, 399]
cursor-option-two: [300, 449]
# single player game scene
top-label: [80, 120]
top-value: [80, 140]
score-label: [80, 180]
score-value: [80, 200]
lines-label: [300, 40]
next-label: [600, 120]
level-label: [600, 220]
well: [280, 80]
next-piece: [620, 160]
spawn-piece: [-260, -60]
# connection scene
game-id-label: [200, 300]
connecting-label: [280, 300]
waiting-for-opponent-label: [200, 300]
# multi player game scene
well-player-1: [80, 80]
well-player-2: [480, 80]
score-label-player-1: [140, 20]
score-label-player-2: [540, 20]
score-value-player-1: [140, 40]
score-value-player-2: [540, 40]
lines-label-player-1: [100, 540]
lines-label-player-2: [500, 540]
next-label-player-1: [360, 120]
next-piece-player-1: [360, 160]
spawn-piece-player-1: [-200, -60]
spawn-piece-player-2: [200, -360]
engine:
fps: 60
tile-size: 20
cursor-blink-interval: 150
period-blink-interval: 300
lines-per-level: 5
ping-interval: 1500
# piece
piece-drop-delay: 600
piece-lock-delay: 1000
piece-drop-delay-decrease: 56
# points
points-table:
- 0 # 0 line
- 40 # 1 line
- 100 # 2 lines
- 300 # 3 lines
- 1200 # 4 lines
color:
# window
window-bg: "#000000"
# title screen
cursor: "#FFFFFF"
# in game
well-1: "#000000"
well-border-1: "#9BFCF0"
well-2: "#9BFCF0"
well-border-2: "#000000"
piece-1-player-1: "#1F37EC"
piece-2-player-1: "#5BDB57"
piece-3-player-1: "#F5F5F5"
piece-inner-border-1-player-1: "#1F37EC"
piece-1-player-2: "#F83801"
piece-2-player-2: "#7F7F7F"
piece-3-player-2: "#F5F5F5"
piece-inner-border-1-player-2: "#F83801"
piece-outer-border-1: "#000000"
piece-ghost: "#9BFCF0"
online:
server-url: "ws://localhost:5085"
#server-url: "ws://localhost:5001"

Binary file not shown.

After

Width:  |  Height:  |  Size: 192 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.9 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

20
frontend/main.py Normal file
View File

@@ -0,0 +1,20 @@
'''
Tetris 101:
https://strategywiki.org/wiki/Tetris/Getting_Started
https://tetris.com/play-tetris
'''
import asyncio
from tetri5.game import Game
from tetri5.util import ConfigurationManager
async def run_game():
ConfigurationManager.init()
Game.init()
while True:
Game.update()
Game.draw()
await asyncio.sleep(0) # yield control to browser
if __name__ == "__main__":
asyncio.run(run_game())

View File

@@ -0,0 +1,4 @@
pygame==2.5.2
pygbag==0.9.2
PyYAML==6.0.2
websockets==15.0.1

View File

@@ -0,0 +1,2 @@
pygame==2.5.2
pygbag==0.9.2

View File

492
frontend/tetri5/entity.py Normal file
View File

@@ -0,0 +1,492 @@
import copy
import random
import pygame
from typing import List, Tuple
from types import FunctionType
from tetri5.util import ConfigurationManager
from tetri5.util import Controller
from tetri5.util import SoundManager
"""
TODO description
"""
class Entity:
def __init__(self, points: Tuple, base_color: str, outer_color: str = None):
self._tile_size = ConfigurationManager.get("engine", "tile-size")
self._points = points
self._base_color = base_color
self._outer_color = outer_color
self._elapsed_time = 0 # TODO does nothing right now
def update(self, elapsed_time: int) -> None:
self._elapsed_time += elapsed_time
def draw(self, surface: pygame.Surface) -> None:
for square in self._points:
if self._base_color is not None:
square_color = pygame.Color(self._base_color)
pygame.draw.polygon(surface, square_color, square, 0)
if self._outer_color is not None:
pygame.draw.polygon(surface, pygame.Color(self._outer_color), square, max(self._tile_size // 6, 1))
def collide(self, entity: "Entity") -> bool: # TODO figure out how to do type hint for entity param of type Entity
for square_one in self._points:
for square_two in entity._points:
for i in range(4): # 4 vertices
if square_one[i][0] == square_two[i][0] and square_one[i][1] == square_two[i][1]:
return True
return False
def _collide(self, points: List) -> bool:
for square_one in self._points:
for square_two in points:
for i in range(4): # 4 vertices
if square_one[i][0] == square_two[i][0] and square_one[i][1] == square_two[i][1]:
return True
return False
"""
TODO description
"""
class Well(Entity):
WIDTH = 10 # standard tetris well width, should not be changed
HEIGHT = 20 # standard tetris well height, should not be changed
def __init__(self, position: Tuple, base_color: str, outer_color: str):
super().__init__(Well._get_points(position), base_color, outer_color)
@classmethod
def _get_points(cls, position: Tuple) -> List:
tile_size = ConfigurationManager.get("engine", "tile-size")
shape = []
for i in range(cls.WIDTH + 2):
for j in range(cls.HEIGHT + 2):
if i in [0, cls.WIDTH + 1]:
shape.append(((i, j), (i + 1, j), (i + 1, j + 1), (i, j + 1)))
elif j in [0, cls.HEIGHT + 1]:
shape.append(((i, j), (i + 1, j), (i + 1, j + 1), (i, j + 1)))
points = []
for square in shape:
sub_points = []
for vertex in square:
point = [vertex[0] * tile_size + position[0], vertex[1] * tile_size + position[1]]
sub_points.append(point)
points.append(sub_points)
return points
'''
For information on the Tetris piece Tetromino go here:
https://tetris.fandom.com/wiki/Tetromino
'''
class Piece(Entity):
def __init__(self, shape: Tuple, position: Tuple, base_color: str, inner_color: str, outer_color: str):
super().__init__(Piece._get_points(shape, position), base_color, outer_color)
self._inner_color = inner_color
self._ghost_piece_base_color = ConfigurationManager.get("color", "piece-ghost")
self._center = self._get_center(shape, position)
self._previous_points = None
self._previous_center = None
# Drop delay
self._drop_delay = ConfigurationManager.get("engine", "piece-drop-delay") # in ms
self._drop_delay_acc = 0 # acumulated drop delay, in ms
self._applying_drop_delay = True
# Lock delay
self._lock_delay = ConfigurationManager.get("engine", "piece-lock-delay") # in ms
self._lock_delay_acc = 0 # acumulated lock delay, in ms
self._applying_lock_delay = False
def update(self, elapsed_time: int, well: Well, stack: "Stack", level: int, clear_current_piece: FunctionType) -> None:
super().update(elapsed_time)
# handle rotation, left and right movement
if Controller.key_down(pygame.K_UP):
if self.rotate_with_wall_kick(well, stack):
SoundManager.play_piece_rotate_sfx()
if Controller.key_down(pygame.K_LEFT):
self.move((-self._tile_size, 0))
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
if Controller.key_down(pygame.K_RIGHT):
self.move((self._tile_size, 0))
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
# handle soft drop movement and gravity based on level
start_drop_delay = ConfigurationManager.get("engine", "piece-drop-delay")
start_lock_delay = ConfigurationManager.get("engine", "piece-lock-delay")
drop_delay_decrease = ConfigurationManager.get("engine", "piece-drop-delay-decrease")
if Controller.key_pressed(pygame.K_DOWN):
self._drop_delay = max(10, (start_drop_delay - (level * drop_delay_decrease)) // 10)
self._set_time = max(10, start_lock_delay // 10)
if not Controller.key_pressed(pygame.K_DOWN):
self._drop_delay = start_drop_delay - (level * drop_delay_decrease)
self._set_time = start_lock_delay
# handle hard drop
if Controller.key_down(pygame.K_SPACE):
self._points = self._get_ghost_piece_points(well, stack)
self._add_to_stack(stack, clear_current_piece)
if self._applying_drop_delay:
self._applying_drop_delay = self._apply_drop(elapsed_time, well, stack)
self._applying_lock_delay = not self._applying_drop_delay
"""
For more information on the piece set logic go here:
https://strategywiki.org/wiki/Tetris/Features#Lock_delay
"""
if self._applying_lock_delay:
self._applying_lock_delay = self._apply_lock(elapsed_time, well, stack, clear_current_piece)
self._applying_drop_delay = not self._applying_lock_delay
def draw(self, surface: pygame.Surface, well: Well = None, stack: "Stack" = None, ghost_piece_off: bool = False) -> None:
# ghost piece
if well is not None and stack is not None and not ghost_piece_off:
ghost_piece_points = self._get_ghost_piece_points(well, stack)
if ghost_piece_points is not None:
for square in ghost_piece_points:
pygame.draw.polygon(surface, pygame.Color(self._ghost_piece_base_color), square, max(self._tile_size // 6, 1)) # TODO add white to the yaml
for square in self._points:
if self._base_color is not None:
square_color = pygame.Color(ConfigurationManager.get("color", self._base_color))
pygame.draw.polygon(surface, square_color, square, 0)
if self._outer_color is not None:
pygame.draw.polygon(surface, pygame.Color(ConfigurationManager.get("color", self._outer_color)), square, max(self._tile_size // 6, 1))
for square in self._points:
# inner color border piece
if self._inner_color:
vertex_one = (square[0][0] + (self._tile_size // 10), square[0][1] + (self._tile_size // 10))
vertex_two = (square[1][0] - (self._tile_size // 10), square[1][1] + (self._tile_size // 10))
vertex_three = (square[2][0] - (self._tile_size // 10), square[2][1] - (self._tile_size // 10))
vertex_four = (square[3][0] + (self._tile_size // 10), square[3][1] - (self._tile_size // 10))
new_square = (vertex_one, vertex_two, vertex_three, vertex_four)
pygame.draw.polygon(surface, pygame.Color(ConfigurationManager.get("color", self._inner_color)), new_square, max(self._tile_size // 6, 1))
# draw glimmer
surface.set_at((square[0][0]+3, square[0][1]+3), "white")
surface.set_at((square[0][0]+4, square[0][1]+4), "white")
surface.set_at((square[0][0]+5, square[0][1]+5), "white")
surface.set_at((square[0][0]+4, square[0][1]+5), "white")
surface.set_at((square[0][0]+5, square[0][1]+4), "white")
def move(self, vector: Tuple) -> None:
self._previous_points = copy.deepcopy(self._points)
self._previous_center = copy.deepcopy(self._center)
self._center[0] += vector[0]
self._center[1] += vector[1]
for square in self._points:
for vertex in square:
vertex[0] += vector[0]
vertex[1] += vector[1]
def rotate_with_wall_kick(self, well: Well, stack: "Stack") -> bool:
self._rotate()
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
# trying kick to the left
self.move((-self._tile_size, 0))
self._rotate(True)
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
else:
# successful kick to the left
return True
# trying kick to the right
self.move((self._tile_size, 0))
self._rotate(True)
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
else:
# successful kick to the right
return True
# trying kick from top
self.move((0, self._tile_size))
self._rotate(True)
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
else:
# successful kick from top
return True
# trying kick 2 tiles from top
self.move((0, self._tile_size * 2))
self._rotate(True)
if well and self.collide(well) or stack and self.collide(stack):
self.revert()
else:
# successful kick 2 tiles from top
return True
else:
return True
# failed rotation
return False
def revert(self) -> None:
if self._previous_points and self._previous_center:
self._points = self._previous_points
self._center = self._previous_center
'''
For more information on a rotation of a piece go here:
https://gamedev.stackexchange.com/questions/17974/how-to-rotate-blocks-in-tetris
'''
def _rotate(self, no_revert: bool = False) -> None:
if not no_revert:
self._previous_points = copy.deepcopy(self._points)
self._previous_center = copy.deepcopy(self._center)
new_points = []
for square in self._points:
for vertex in square:
h = vertex[0] - self._center[0]
k = vertex[1] - self._center[1]
vertex[0] = (k * -1) + self._center[0]
vertex[1] = h + self._center[1]
new_points.append([square[-1]] + square[0:-1])
self._points = new_points
@classmethod
def _get_points(self, shape: Tuple, position: Tuple) -> List:
tile_size = ConfigurationManager.get("engine", "tile-size")
points = []
for square in shape[:-1]:
sub_points = []
for vertex in square:
point = [vertex[0] * tile_size + position[0], vertex[1] * tile_size + position[1]]
sub_points.append(point)
points.append(sub_points)
return points
def _get_center(self, shape: Tuple, position: Tuple) -> List:
center = shape[-1]
return [int(center[0] * self._tile_size + position[0]), int(center[1] * self._tile_size + position[1])]
def _apply_drop(self, elapsed_time: int, well: Well, stack: "Stack") -> bool:
self._drop_delay_acc += elapsed_time
if self._drop_delay_acc >= self._drop_delay:
self._drop_delay_acc = 0
if not self._entity_is_below(well) and not self._entity_is_below(stack):
self.move((0, self._tile_size))
else:
return False
return True
def _apply_lock(self, elapsed_time: int, well: Well, stack: "Stack", clear_current_piece: FunctionType) -> bool:
self._lock_delay_acc += elapsed_time
if self._lock_delay_acc >= self._lock_delay:
self._add_to_stack(stack, clear_current_piece)
return False
return self._entity_is_below(well) or self._entity_is_below(stack)
def _add_to_stack(self, stack: "Stack", clear_current_piece: FunctionType) -> None:
SoundManager.play_piece_set_sfx()
stack.add_piece(self)
clear_current_piece()
def _mimic_move(self, vector: Tuple) -> List:
mimic_points = copy.deepcopy(self._points)
for square in mimic_points:
for vertex in square:
vertex[0] += vector[0]
vertex[1] += vector[1]
return mimic_points
def _entity_is_below(self, entity: Entity) -> bool:
mimic_points = self._mimic_move((0, self._tile_size))
return entity and entity._collide(mimic_points)
def _get_ghost_piece_points(self, well: Well, stack: "Stack") -> List:
current_points = copy.deepcopy(self._points)
prior_points = None
while not well._collide(current_points) and not stack._collide(current_points):
prior_points = copy.deepcopy(current_points)
for square in current_points:
for vertex in square:
vertex[1] += 1 * self._tile_size
return prior_points
# shape attributes
I_SHAPE = (((0, 0), (1, 0), (1, 1), (0, 1)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((2, 0), (3, 0), (3, 1), (2, 1)), ((3, 0), (4, 0), (4, 1), (3, 1)), (2, 0))
J_SHAPE = (((0, 0), (1, 0), (1, 1), (0, 1)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((2, 0), (3, 0), (3, 1), (2, 1)), ((2, 1), (3, 1), (3, 2), (2, 2)), (1.5, 0.5))
L_SHAPE = (((0, 0), (1, 0), (1, 1), (0, 1)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((2, 0), (3, 0), (3, 1), (2, 1)), ((0, 1), (1, 1), (1, 2), (0, 2)), (1.5, 0.5))
O_SHAPE = (((0, 0), (1, 0), (1, 1), (0, 1)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((1, 1), (2, 1), (2, 2), (1, 2)), ((0, 1), (1, 1), (1, 2), (0, 2)), (1, 1))
S_SHAPE = (((0, 1), (1, 1), (1, 2), (0, 2)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((1, 1), (2, 1), (2, 2), (1, 2)), ((2, 0), (3, 0), (3, 1), (2, 1)), (1.5, 0.5))
T_SHAPE = (((0, 0), (1, 0), (1, 1), (0, 1)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((1, 1), (2, 1), (2, 2), (1, 2)), ((2, 0), (3, 0), (3, 1), (2, 1)), (1.5, 0.5))
Z_SHAPE = (((0, 0), (1, 0), (1, 1), (0, 1)), ((1, 0), (2, 0), (2, 1), (1, 1)), ((1, 1), (2, 1), (2, 2), (1, 2)), ((2, 1), (3, 1), (3, 2), (2, 2)), (1.5, 0.5))
"""
TODO description
"""
class Stack(Entity):
def __init__(self):
super().__init__([], None, None)
self._square_colors = []
self.total_lines = 0
self.lines_completed_last = 0
def update(self, elapsed_time: int) -> None:
super().update(elapsed_time)
self.lines_completed_last = self._complete_rows()
self.total_lines += self.lines_completed_last
def draw(self, surface: pygame.Surface) -> None:
# only draw if squares and colors are aligned
if len(self._square_colors) != len(self._points):
return
for i in range(len(self._points)):
square = self._points[i]
square_design = self._square_colors[i]
if square_design.base_color is not None:
pygame.draw.polygon(surface, pygame.Color(ConfigurationManager.get("color", square_design.base_color)), square, 0)
if square_design.outer_color is not None:
pygame.draw.polygon(surface, pygame.Color(ConfigurationManager.get("color", square_design.outer_color)), square, max(self._tile_size // 6, 1))
if square_design.inner_color is not None:
vertex_one = (square[0][0] + (self._tile_size // 10), square[0][1] + (self._tile_size // 10))
vertex_two = (square[1][0] - (self._tile_size // 10), square[1][1] + (self._tile_size // 10))
vertex_three = (square[2][0] - (self._tile_size // 10), square[2][1] - (self._tile_size // 10))
vertex_four = (square[3][0] + (self._tile_size // 10), square[3][1] - (self._tile_size // 10))
new_square = (vertex_one, vertex_two, vertex_three, vertex_four)
pygame.draw.polygon(surface, pygame.Color(ConfigurationManager.get("color", square_design.inner_color)), new_square, max(self._tile_size // 6, 1))
# draw glimmer
surface.set_at((square[0][0]+3, square[0][1]+3), pygame.Color(255, 255, 255))
surface.set_at((square[0][0]+4, square[0][1]+4), pygame.Color(255, 255, 255))
surface.set_at((square[0][0]+5, square[0][1]+5), pygame.Color(255, 255, 255))
surface.set_at((square[0][0]+4, square[0][1]+5), pygame.Color(255, 255, 255))
surface.set_at((square[0][0]+5, square[0][1]+4), pygame.Color(255, 255, 255))
def add_piece(self, piece: Piece) -> None:
self._points += piece._points
self._square_colors += [SquareColor(piece._base_color, piece._inner_color, piece._outer_color) for _ in range(len(piece._points))]
# TODO refactor into multiple functions
def _complete_rows(self) -> int:
squares_by_row = {}
for square in self._points:
top_left_vertex = square[0]
if top_left_vertex[1] not in squares_by_row:
squares_by_row[top_left_vertex[1]] = []
if square not in squares_by_row[top_left_vertex[1]]:
squares_by_row[top_left_vertex[1]].append(square)
squares_to_exclude = []
rows_completed = []
for key, value in squares_by_row.items():
if len(squares_by_row[key]) == Well.WIDTH:
squares_to_exclude += value
rows_completed.append(key)
if not squares_to_exclude:
return 0
if len(rows_completed) == 4:
SoundManager.play_four_lines_complete_sfx()
else:
SoundManager.play_line_complete_sfx()
new_points = []
self.new_square_colors = []
for i in range(len(self._points)):
square = self._points[i]
if square not in squares_to_exclude:
for vertex in square:
distance_to_move = sum(
vertex[1] <= row_completed
for row_completed in rows_completed
)
vertex[1] += self._tile_size * distance_to_move
new_points.append(square)
self.new_square_colors.append(self._square_colors[i])
self._points = new_points
self._square_colors = self.new_square_colors
return len(rows_completed)
"""
TODO description
"""
class SquareColor:
def __init__(self, base_color: str, inner_color: str, outer_color: str):
self.base_color = base_color
self.inner_color = inner_color
self.outer_color = outer_color
"""
TODO description
"""
class PieceGenerator:
_bucket = []
@classmethod
def get_piece(cls, position: Tuple) -> Piece:
if len(cls._bucket) == 0:
cls._generate_bucket(cls._bucket)
base_color, inner_color, outer_color = cls._get_piece_color()
return Piece(cls._get_piece_shape(cls._bucket.pop()), position, base_color, inner_color, outer_color)
@classmethod
def get_opponent_piece(cls, base_color: str, inner_color: str, outer_color: str) -> Piece:
return Piece(Piece.Z_SHAPE, (-250, -250), base_color, inner_color, outer_color)
@classmethod
def _generate_bucket(cls, bucket: List) -> None:
piece_types = list(range(7))
while len(bucket) != 7:
random_number = random.randint(0, 6 - len(bucket))
bucket.append(piece_types.pop(random_number))
@classmethod
def _get_piece_shape(cls, piece_number: int) -> Tuple:
if piece_number == 0:
return Piece.I_SHAPE
if piece_number == 1:
return Piece.J_SHAPE
if piece_number == 2:
return Piece.L_SHAPE
if piece_number == 3:
return Piece.O_SHAPE
if piece_number == 4:
return Piece.S_SHAPE
if piece_number == 5:
return Piece.T_SHAPE
if piece_number == 6:
return Piece.Z_SHAPE
return None
@classmethod
def _get_piece_color(cls) -> Tuple:
random_number = random.randint(1, 3)
base_color = "piece-" + str(random_number) + "-player-1"
inner_color = None if random_number != 3 else "piece-inner-border-1-player-1"
outer_color = "piece-outer-border-1"
return (base_color, inner_color, outer_color)

62
frontend/tetri5/game.py Normal file
View File

@@ -0,0 +1,62 @@
import sys
from tetri5.online import MultiplayerService
import pygame
from tetri5.util import ConfigurationManager
from tetri5.util import TextGenerator
from tetri5.util import SoundManager
from tetri5.scene import Scene, TitleScene
# TODO improve game assets https://www.spriters-resource.com/nes/tetris/
# TODO create a util that manages sfx
class Game:
_current_scene = None
@classmethod
def init(cls) -> None:
pygame.init()
SoundManager.init()
TextGenerator.init(ConfigurationManager.get("image", "font"), (20, 20))
cls._current_scene = TitleScene(Game.change_scene)
win_width = ConfigurationManager.get("window", "width")
win_height = ConfigurationManager.get("window", "height")
win_title = ConfigurationManager.get("window", "title")
win_icon = ConfigurationManager.get("image", "window-icon")
cls.fps = ConfigurationManager.get("engine", "fps")
cls.tile_size = ConfigurationManager.get("engine", "tile-size")
cls.screen = pygame.display.set_mode((win_width, win_height))
cls.clock = pygame.time.Clock()
pygame.display.set_caption(win_title)
pygame.display.set_icon(pygame.image.load(win_icon))
@classmethod
def update(cls) -> None:
# TODO write not initialized exception
elapsed_time = cls.clock.tick(cls.fps)
if cls._current_scene:
cls._current_scene.update(elapsed_time)
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
MultiplayerService.quit()
sys.exit()
@classmethod
def draw(cls) -> None:
# TODO write not initialized exception
if cls._current_scene:
cls._current_scene.draw(cls.screen)
# update display
pygame.display.flip()
@classmethod
def change_scene(cls, scene: Scene) -> None:
cls._current_scene = scene

27
frontend/tetri5/modal.py Normal file
View File

@@ -0,0 +1,27 @@
import pygame
from pygame import Surface
from tetri5.util import SoundManager, TextGenerator
class GameOverModal:
def __init__(self):
self.is_open = False
self.is_winner = False
self.music_played = False
def show(self, is_winner: bool) -> None:
self.is_open = True
self.is_winner = is_winner
def draw(self, surface: Surface) -> None:
pygame.draw.rect(surface, "black", pygame.Rect(290, 240, 220, 100))
pygame.draw.rect(surface, "white", pygame.Rect(290, 240, 220, 100), 3, 5)
TextGenerator.draw("YOU LOSE" if not self.is_winner else "YOU WIN!", (320, 260), surface)
TextGenerator.draw("GAME OVER", (310, 300), surface)
def update(self, _: int) -> None:
if not self.music_played:
if self.is_winner:
SoundManager.play_you_win_sfx()
else:
SoundManager.play_game_over_sfx()
self.music_played = True

287
frontend/tetri5/online.py Normal file
View File

@@ -0,0 +1,287 @@
import sys
sys.path.append("__pypackages__/3.12/lib/")
import websockets # non-native package
import asyncio
import json
import queue # refer to https://docs.python.org/3/library/queue.html for cross threading communication
import uuid
from typing import Dict, List
from threading import Thread
from tetri5.util import ConfigurationManager
class MultiplayerService():
_thread = None
_current_game_id = None
_client_id = str(uuid.uuid4())
""" QUEUES """
_receive_piece_queue = queue.Queue()
_send_piece_queue = queue.Queue()
_receive_stack_queue = queue.Queue()
_send_stack_queue = queue.Queue()
_receive_stats_queue = queue.Queue()
_send_stats_queue = queue.Queue()
_receive_message_queue = queue.Queue()
_send_message_queue = queue.Queue()
WAIT_FOR_OPPONENT = "wait_for_opponent"
START_GAME = "start_game"
@classmethod
def init(cls) -> None:
thread = Thread(target=asyncio.run, args=(_NetworkConnectionService.init(),))
thread.start()
""" SEND """
@classmethod
def send_piece(cls, piece: "PieceDto") -> None:
cls._send_piece_queue.put(piece)
@classmethod
def send_stack(cls, stack: "StackDto") -> None:
cls._send_stack_queue.put(stack)
@classmethod
def send_stats(cls, stats: "StatsDto") -> None:
cls._send_stats_queue.put(stats)
@classmethod
def send_message(cls, message: str) -> None:
cls._send_message_queue.put(message)
""" RECEIVE """
@classmethod
def try_receive_piece(cls) -> "PieceDto":
if cls._receive_piece_queue.empty():
return None
result = cls._receive_piece_queue.get()
cls._receive_piece_queue.task_done()
return result
@classmethod
def try_receive_stack(cls) -> "StackDto":
if cls._receive_stack_queue.empty():
return None
result = cls._receive_stack_queue.get()
cls._receive_stack_queue.task_done()
return result
@classmethod
def try_receive_stats(cls) -> "StatsDto":
if cls._receive_stats_queue.empty():
return None
result = cls._receive_stats_queue.get()
cls._receive_stats_queue.task_done()
return result
@classmethod
def try_receive_message(cls) -> str:
if cls._receive_message_queue.empty():
return None
result = cls._receive_message_queue.get()
cls._receive_message_queue.task_done()
return result
""" MISC """
@classmethod
def enter_game(cls, game_id: str) -> None:
cls._current_game_id = game_id
_NetworkConnectionService._join_game = True # TODO: change this to a function
@classmethod
def quit(cls) -> None:
_NetworkConnectionService.close_connection()
class _NetworkConnectionService():
_websocket = None
_is_closed = False
_join_game = False
_pending_receive_task = None
@classmethod
async def init(cls) -> None:
await cls._connect_to_server()
await cls._run_network_loop()
""" NETWORK SEND """
@classmethod
async def _try_enter_game(cls) -> None:
if not cls._join_game:
return
json_message = json.dumps({"action": "enter_game",\
"clientId": MultiplayerService._client_id,\
"gameId": MultiplayerService._current_game_id})
await cls._websocket.send(json_message)
cls._join_game = False
@classmethod
async def _try_send_piece(cls) -> None:
# if no messages to proccess, return
if MultiplayerService._send_piece_queue.empty():
return
# get next piece to send and send to server
piece = MultiplayerService._send_piece_queue.get()
# construct json message
json_message = json.dumps({"action": "send_piece",\
"clientId": MultiplayerService._client_id,\
"gameId": MultiplayerService._current_game_id,\
"piece": piece.__dict__})
await cls._websocket.send(json_message)
MultiplayerService._send_piece_queue.task_done()
@classmethod
async def _try_send_stack(cls) -> None:
# if no messages to proccess, return
if MultiplayerService._send_stack_queue.empty():
return
# get next stack to send and send to server
stack = MultiplayerService._send_stack_queue.get()
# construct json message
json_message = json.dumps({"action": "send_stack",\
"clientId": MultiplayerService._client_id,\
"gameId": MultiplayerService._current_game_id,\
"stack": stack.__dict__})
await cls._websocket.send(json_message)
MultiplayerService._send_stack_queue.task_done()
@classmethod
async def _try_send_stats(cls) -> None:
# if no messages to proccess, return
if MultiplayerService._send_stats_queue.empty():
return
# get next stats to send and send to server
stats = MultiplayerService._send_stats_queue.get()
# construct json message
json_message = json.dumps({"action": "send_stats",\
"clientId": MultiplayerService._client_id,\
"gameId": MultiplayerService._current_game_id,\
"stats": stats.__dict__})
await cls._websocket.send(json_message)
MultiplayerService._send_stats_queue.task_done()
@classmethod
async def _try_send_message(cls) -> None:
# if no messages to proccess, return
if MultiplayerService._send_message_queue.empty():
return
# get next message to send and send to server
message = MultiplayerService._send_message_queue.get()
await cls._websocket.send(message)
MultiplayerService._send_message_queue.task_done()
""" NETWORK RECEIVE """
# todo refactor
@classmethod
async def _try_receive_message(cls) -> None:
try:
task = cls._pending_receive_task or asyncio.create_task(cls._websocket.recv())
done, pending = await asyncio.wait({task}, timeout=2e-3) # TODO experiment with the timeout
if task in done:
json_str = await task
if json_str == "pong":
print("pong")
cls._pending_receive_task = None
return
data = json.loads(json_str)
if data["type"] == "wait_for_opponent":
MultiplayerService._receive_message_queue.put(MultiplayerService.WAIT_FOR_OPPONENT)
if data["type"] == "start_game":
MultiplayerService._receive_message_queue.put(MultiplayerService.START_GAME)
if data["type"] == "receive_piece":
MultiplayerService._receive_piece_queue.put(PieceDto.create(data["piece"]))
if data["type"] == "receive_stack":
MultiplayerService._receive_stack_queue.put(StackDto.create(data["stack"]))
if data["type"] == "receive_stats":
MultiplayerService._receive_stats_queue.put(StatsDto.create(data["stats"]))
if data["type"] == "exit_game":
print("Exit the game!")
cls.close_connection()
cls._pending_receive_task = None
elif len(pending):
cls._pending_receive_task = pending.pop()
finally:
pass # TODO handle connection closed exception and attempt to reconnect
""" MISC """
@classmethod
async def _run_network_loop(cls) -> None:
while True:
await asyncio.sleep(2e-3) # TODO add clock tick instead
await cls._try_enter_game()
await cls._try_send_piece()
await cls._try_send_stack()
await cls._try_send_stats()
await cls._try_send_message()
await cls._try_receive_message()
# if conection is closed, exit loop
if cls._is_closed:
await cls._websocket.close()
break
# ping_interval=None is important, otherwise the server will disconnect us
# https://stackoverflow.com/a/58993145/11512104
@classmethod
async def _connect_to_server(cls) -> None:
print("Connecting to server...") # TODO replace with logging
url = ConfigurationManager.get("online", "server-url")
cls._websocket = await websockets.connect(url, ping_interval=None)
print("Connected to server...") # TODO replace with logging
@classmethod
def close_connection(cls) -> None:
cls._is_closed = True
# DTOs
class PieceDto():
def __init__(self, points: List, center: List, base_color: str, inner_color: str, outer_color: str) -> None:
self.points = points
self.center = center
self.base_color = base_color
self.inner_color = inner_color
self.outer_color = outer_color
@staticmethod
def create(data: Dict) -> "PieceDto":
return PieceDto(data["points"], data["center"], data["base_color"], data["inner_color"], data["outer_color"])
class StackDto():
def __init__(self, points: List, square_colors: List[Dict]) -> None:
self.points = points
self.square_colors = square_colors
@staticmethod
def create(data: Dict) -> "StackDto":
return StackDto(data["points"], data["square_colors"])
class StatsDto():
def __init__(self, score: int, lines: int, is_well_full: bool) -> None:
self.score = score
self.lines = lines
self.is_well_full = is_well_full
@staticmethod
def create(data: Dict) -> "StatsDto":
return StatsDto(data["score"], data["lines"], data["is_well_full"])

479
frontend/tetri5/scene.py Normal file
View File

@@ -0,0 +1,479 @@
import pygame
from types import FunctionType
from tetri5.util import ConfigurationManager
from tetri5.util import TextGenerator
from tetri5.util import Controller
from tetri5.util import SoundManager
from tetri5.entity import SquareColor, Well
from tetri5.entity import Stack
from tetri5.entity import PieceGenerator
from tetri5.online import *
from tetri5.modal import GameOverModal
"""
TODO
"""
class Scene:
pass
"""
TODO
"""
class TitleScene(Scene):
# Title screen options
ONE_PLAYER = "1 PLAYER"
TWO_PLAYER = "2 PLAYER"
def __init__(self, change_scene: FunctionType) -> None:
self._tile_size = ConfigurationManager.get("engine", "tile-size")
self._background_color = pygame.Color(ConfigurationManager.get("color", "window-bg"))
self._logo_image = pygame.image.load(ConfigurationManager.get("image", "title-screen"))
self._cursor_pos_one = ConfigurationManager.get("position", "cursor-option-one")
self._cursor_pos_two = ConfigurationManager.get("position", "cursor-option-two")
self._cursor_pos = self._cursor_pos_one
self._cursor_blink_interval = ConfigurationManager.get("engine", "cursor-blink-interval")
self._cursor_blink_time = 0
self._cursor_color = pygame.Color(ConfigurationManager.get("color", "cursor"))
self._cursor_off = False
self._logo_pos = ConfigurationManager.get("position", "title-logo")
self._option_one_pos = ConfigurationManager.get("position", "option-one")
self._option_two_pos = ConfigurationManager.get("position", "option-two")
self._is_multiplayer = False
self._change_scence = change_scene
def draw(self, surface: pygame.Surface) -> None:
surface.fill(self._background_color)
surface.blit(self._logo_image, self._logo_pos)
TextGenerator.draw(TitleScene.ONE_PLAYER, self._option_one_pos, surface)
TextGenerator.draw(TitleScene.TWO_PLAYER, self._option_two_pos, surface)
if self._cursor_off:
pygame.draw.circle(surface, self._cursor_color, self._cursor_pos, self._tile_size // 3)
def update(self, elapsed_time: int) -> None:
option_change = False
if Controller.key_pressed(pygame.K_UP) and self._is_multiplayer:
self._cursor_pos = self._cursor_pos_one
self._is_multiplayer = False
option_change = True
if Controller.key_pressed(pygame.K_DOWN) and not self._is_multiplayer:
self._cursor_pos = self._cursor_pos_two
self._is_multiplayer = True
option_change = True
if option_change:
SoundManager.play_option_change_sfx() # TODO add cool down
self._cursor_blink_time += elapsed_time
if self._cursor_blink_time >= self._cursor_blink_interval:
self._cursor_blink_time = 0
self._cursor_off = not self._cursor_off
if Controller.key_pressed(pygame.K_RETURN):
self._change_scence(SinglePlayerScene(self._change_scence) if not self._is_multiplayer else ConnectionScene(self._change_scence))
"""
TODO
"""
class SinglePlayerScene(Scene):
def __init__(self, change_scene: FunctionType) -> None:
self._top_label_pos = ConfigurationManager.get("position", "top-label")
self._top_value_pos = ConfigurationManager.get("position", "top-value")
self._score_label_pos = ConfigurationManager.get("position", "score-label")
self._score_value_pos = ConfigurationManager.get("position", "score-value")
self._lines_label_pos = ConfigurationManager.get("position", "lines-label")
self._next_label_pos = ConfigurationManager.get("position", "next-label")
self._level_label_pos = ConfigurationManager.get("position", "level-label")
self._next_piece_pos = ConfigurationManager.get("position", "next-piece")
self._spawn_piece_shift = ConfigurationManager.get("position", "spawn-piece")
self._tile_size = ConfigurationManager.get("engine", "tile-size")
self._background_color = pygame.Color(ConfigurationManager.get("color", "window-bg"))
self._points_table = ConfigurationManager.get("engine", "points-table")
self._lines_per_level = ConfigurationManager.get("engine", "lines-per-level")
self._score = 0
self._previous_level = 0
self._current_piece = None
self._next_piece = PieceGenerator.get_piece(self._next_piece_pos)
self._well = Well(ConfigurationManager.get("position", "well"),\
ConfigurationManager.get("color", "well-1"),\
ConfigurationManager.get("color", "well-border-1"))
self._stack = Stack()
self._game_over_modal = GameOverModal()
self._change_scence = change_scene
SoundManager.play_theme_music_single()
def draw(self, surface: pygame.Surface) -> None:
if self._game_over_modal.is_open:
self._game_over_modal.draw(surface)
return
surface.fill(self._background_color)
if self._stack:
self._stack.draw(surface)
if self._current_piece:
self._current_piece.draw(surface, self._well, self._stack)
if self._next_piece:
self._next_piece.draw(surface)
if self._well:
self._well.draw(surface)
score = str(self._score).zfill(6)
lines = str(self._stack.total_lines).zfill(4)
level = str(self._get_level()).zfill(2)
TextGenerator.draw("Top", self._top_label_pos, surface)
TextGenerator.draw("000000", self._top_value_pos, surface)
TextGenerator.draw("Score", self._score_label_pos, surface)
TextGenerator.draw(score, self._score_value_pos, surface)
TextGenerator.draw("Lines " + lines, self._lines_label_pos, surface)
TextGenerator.draw("Next", self._next_label_pos, surface)
TextGenerator.draw("LVL " + level, self._level_label_pos, surface)
def update(self, elapsed_time: int) -> None:
if self._game_over_modal.is_open:
self._game_over_modal.update(elapsed_time)
return
if self._current_piece:
self._current_piece.update(elapsed_time,\
self._well,\
self._stack,\
self._get_level(),\
self._clear_current_piece)
else:
self._current_piece = self._next_piece
self._current_piece.move(self._spawn_piece_shift)
self._next_piece = PieceGenerator.get_piece(self._next_piece_pos)
# TODO create game over scene
if self._stack and self._current_piece.collide(self._stack):
SoundManager.stop_theme_music_single()
self._game_over_modal.show(False)
if self._stack:
self._stack.update(elapsed_time)
self._score += self._points_table[self._stack.lines_completed_last] * (self._get_level() + 1)
if self._previous_level != self._get_level():
self._previous_level = self._get_level()
SoundManager.play_level_up_sfx()
def _get_level(self) -> int:
return 0 if self._stack is None else self._stack.total_lines // self._lines_per_level
def _clear_current_piece(self) -> None:
self._current_piece = None
class ConnectionScene(Scene):
MAX_GAME_ID_LEN = 5
MAX_PERIOD_COUNT = 3
def __init__(self, change_scene: FunctionType) -> None:
self._background_color = pygame.Color(ConfigurationManager.get("color", "window-bg"))
self._is_connecting = False
self._waiting_for_opponent = False
self._game_id_label_pos = ConfigurationManager.get("position", "game-id-label")
self._game_id_label = "Enter Game Id: "
self._game_id = "_"
self._cursor_blink_interval = ConfigurationManager.get("engine", "cursor-blink-interval")
self._cursor_blink_acc = 0 # blink accumulator
self._connecting_label_pos = ConfigurationManager.get("position", "connecting-label")
self._connecting_label = "Connecting"
self._waiting_for_opponent_label_pos = ConfigurationManager.get("position", "waiting-for-opponent-label")
self._waiting_for_opponent_label = "Waiting for opponent"
self._period_blink_interval = ConfigurationManager.get("engine", "period-blink-interval")
self._period_blink_acc = 0 # period accumulator
self._ping_interval = ConfigurationManager.get("engine", "ping-interval")
self._ping_acc = 0 # ping accumulator
self._change_scene = change_scene
def draw(self, surface: pygame.Surface) -> None:
surface.fill(self._background_color)
if not self._is_connecting and not self._waiting_for_opponent:
TextGenerator.draw(self._game_id_label + self._game_id, self._game_id_label_pos, surface)
elif self._is_connecting and not self._waiting_for_opponent:
TextGenerator.draw(self._connecting_label, self._connecting_label_pos, surface)
else:
TextGenerator.draw(self._waiting_for_opponent_label, self._waiting_for_opponent_label_pos, surface)
def update(self, elapsed_time: int) -> None:
# cursor blink logic
self._cursor_blink_acc += elapsed_time
if self._cursor_blink_acc >= self._cursor_blink_interval:
self._cursor_blink_acc = 0
if self._game_id.find("_") != -1:
self._game_id = self._game_id.replace("_", "")
else:
self._game_id += "_"
# period ellipsis logic for connecting
if self._is_connecting and not self._waiting_for_opponent:
self._period_blink_acc += elapsed_time
if self._period_blink_acc >= self._period_blink_interval:
self._period_blink_acc = 0
period_count = self._connecting_label.count(".")
if period_count < self.MAX_PERIOD_COUNT:
self._connecting_label += "."
else:
self._connecting_label = self._connecting_label.replace(".", "")
# period ellipsis logic for waiting on opponent
if self._waiting_for_opponent:
self._period_blink_acc += elapsed_time
if self._period_blink_acc >= self._period_blink_interval:
self._period_blink_acc = 0
period_count = self._waiting_for_opponent_label.count(".")
if period_count < self.MAX_PERIOD_COUNT:
self._waiting_for_opponent_label += "."
else:
self._waiting_for_opponent_label = self._waiting_for_opponent_label.replace(".", "")
# keyboard input
for event in pygame.event.get(pygame.KEYDOWN):
# user input logic
if not self._is_connecting:
self._game_id = self._game_id.replace("_", "")
if event.key == pygame.K_BACKSPACE:
self._game_id = self._game_id[:-1]
elif (event.unicode.isalpha() or event.unicode.isdigit()) and\
len(self._game_id) <= self.MAX_GAME_ID_LEN:
self._game_id += event.unicode.upper()
# connection logic
if event.key == pygame.K_RETURN and len(self._game_id) > 0 and not self._is_connecting:
self._is_connecting = True
MultiplayerService.init()
MultiplayerService.enter_game(self._game_id)
# server messaging logic
self._ping_acc += elapsed_time
message = MultiplayerService.try_receive_message();
if message: # TODO remove later, only for testing
print(message)
if message == MultiplayerService.WAIT_FOR_OPPONENT:
self._waiting_for_opponent = True
if message == MultiplayerService.START_GAME:
self._change_scene(MultiplayerScene(self._change_scene))
if message is None and self._ping_acc >= self._ping_interval:
self._ping_acc = 0
MultiplayerService.send_message("ping")
"""
TODO
"""
class MultiplayerScene(Scene):
def __init__(self, change_scene: FunctionType) -> None:
self._tile_size = ConfigurationManager.get("engine", "tile-size")
self._background_color = pygame.Color(ConfigurationManager.get("color", "window-bg"))
self._lines_per_level = ConfigurationManager.get("engine", "lines-per-level")
self._points_table = ConfigurationManager.get("engine", "points-table")
self._score_player_one = 0
self._score_player_two = 0
self._total_lines_player_two = 0
# wells init
self._well_player_one = Well(ConfigurationManager.get("position", "well-player-1"),\
ConfigurationManager.get("color", "well-1"),\
ConfigurationManager.get("color", "well-border-1"))
self._well_player_two = Well(ConfigurationManager.get("position", "well-player-2"),\
ConfigurationManager.get("color", "well-1"),\
ConfigurationManager.get("color", "well-border-1"))
# stacks init
self._stack_player_one = Stack()
self._stack_player_two = Stack()
# score positions
self._score_label_player_one_pos = ConfigurationManager.get("position", "score-label-player-1")
self._score_label_player_two_pos = ConfigurationManager.get("position", "score-label-player-2")
self._score_value_player_one_pos = ConfigurationManager.get("position", "score-value-player-1")
self._score_value_player_two_pos = ConfigurationManager.get("position", "score-value-player-2")
# lines positions
self._lines_label_player_one_pos = ConfigurationManager.get("position", "lines-label-player-1")
self._lines_label_player_two_pos = ConfigurationManager.get("position", "lines-label-player-2")
# next positions
self._next_label_player_one_pos = ConfigurationManager.get("position", "next-label-player-1")
# piece positions
self._next_piece_player_one_pos = ConfigurationManager.get("position", "next-piece-player-1")
self._spawn_piece_shift_player_one = ConfigurationManager.get("position", "spawn-piece-player-1")
self._spawn_piece_shift_player_two = ConfigurationManager.get("position", "spawn-piece-player-2")
# entities
self._next_piece_player_one = PieceGenerator.get_piece(self._next_piece_player_one_pos)
self._current_piece_player_one = None
self._current_piece_player_two = None
self._game_over_modal = GameOverModal()
self._well_full = False
self._change_scence = change_scene
SoundManager.play_theme_music_multi()
def draw(self, surface: pygame.Surface) -> None:
if self._game_over_modal.is_open:
self._game_over_modal.draw(surface)
return
surface.fill(self._background_color)
# stacks
if self._stack_player_one is not None:
self._stack_player_one.draw(surface)
if self._stack_player_two is not None:
self._stack_player_two.draw(surface)
# pieces
if self._current_piece_player_one is not None:
self._current_piece_player_one.draw(surface, self._well_player_one, self._stack_player_one)
if self._current_piece_player_two is not None:
self._current_piece_player_two.draw(surface, self._well_player_two, self._stack_player_two, True)
if self._next_piece_player_one is not None:
self._next_piece_player_one.draw(surface)
# wells
if self._well_player_one is not None:
self._well_player_one.draw(surface)
if self._well_player_two is not None:
self._well_player_two.draw(surface)
score_player_one = str(self._score_player_one).zfill(6)
score_player_two = str(self._score_player_two).zfill(6)
lines_player_one = str(self._stack_player_one.total_lines).zfill(4)
lines_player_two = str(self._total_lines_player_two).zfill(4)
# scores
TextGenerator.draw("Score", self._score_label_player_one_pos, surface)
TextGenerator.draw(score_player_one, self._score_value_player_one_pos, surface)
TextGenerator.draw("Score", self._score_label_player_two_pos, surface)
TextGenerator.draw(score_player_two, self._score_value_player_two_pos, surface)
# lines
TextGenerator.draw("Lines " + lines_player_one, self._lines_label_player_one_pos, surface)
TextGenerator.draw("Lines " + lines_player_two, self._lines_label_player_two_pos, surface)
# next
TextGenerator.draw("NEXT", self._next_label_player_one_pos, surface)
def update(self, elapsed_time: int) -> None:
if self._game_over_modal.is_open:
self._game_over_modal.update(elapsed_time)
return
self._update_piece_player_one(elapsed_time)
self._update_piece_player_two()
if self._stack_player_one is not None:
self._stack_player_one.update(elapsed_time)
self._update_stack_player_two()
self._score_player_one += self._points_table[self._stack_player_one.lines_completed_last] * (self._get_level_player_one() + 1)
opponent_stats = MultiplayerService.try_receive_stats()
if opponent_stats is not None:
self._score_player_two = opponent_stats.score
self._total_lines_player_two = opponent_stats.lines
if opponent_stats.is_well_full:
SoundManager.stop_theme_music_multi()
self._game_over_modal.show(True)
if self._current_piece_player_one is not None:
MultiplayerService.send_piece(PieceDto(self._current_piece_player_one._points,\
self._current_piece_player_one._center,\
self._current_piece_player_one._base_color,\
self._current_piece_player_one._inner_color,\
self._current_piece_player_one._outer_color))
if self._stack_player_one is not None:
MultiplayerService.send_stack(
StackDto(
self._stack_player_one._points,
[
x.__dict__
for x in self._stack_player_one._square_colors
],
)
)
MultiplayerService.send_stats(StatsDto(self._score_player_one, self._stack_player_one.total_lines, self._well_full))
def _update_piece_player_one(self, elapsed_time: int) -> None:
if self._current_piece_player_one is not None:
self._current_piece_player_one.update(elapsed_time,\
self._well_player_one,\
self._stack_player_one,\
self._get_level_player_one(),\
self._clear_current_piece_player_one)
else:
self._current_piece_player_one = self._next_piece_player_one
self._current_piece_player_one.move(self._spawn_piece_shift_player_one)
self._next_piece_player_one = PieceGenerator.get_piece(self._next_piece_player_one_pos)
# TODO create game over scene
if self._stack_player_one and self._current_piece_player_one.collide(self._stack_player_one):
self._well_full = True
SoundManager.stop_theme_music_multi()
self._game_over_modal.show(False)
def _update_piece_player_two(self) -> None:
if self._current_piece_player_two is not None:
# get opponent piece from server and modify for current client
opponent_piece = MultiplayerService.try_receive_piece()
if opponent_piece is not None:
for square in opponent_piece.points:
for vertex in square:
vertex[0] += 400
opponent_piece.center[0] += 400
# load opponent piece into game
self._current_piece_player_two._points = opponent_piece.points
self._current_piece_player_two._center = opponent_piece.center
self._current_piece_player_two._base_color = opponent_piece.base_color.replace("player-1", "player-2")
if opponent_piece.inner_color is not None:
self._current_piece_player_two._inner_color = opponent_piece.inner_color.replace("player-1", "player-2")
else:
self._current_piece_player_two._inner_color = None
self._current_piece_player_two._outer_color = opponent_piece.outer_color.replace("player-1", "player-2")
else:
self._current_piece_player_two = PieceGenerator.get_opponent_piece(None, None, None)
def _update_stack_player_two(self) -> None:
if self._stack_player_two is not None:
# get opponent stack from server and modify for current client
opponent_stack = MultiplayerService.try_receive_stack()
if opponent_stack is not None:
for square in opponent_stack.points:
for vertex in square:
vertex[0] += 400
# load opponent stack into game
self._stack_player_two._points = opponent_stack.points
self._stack_player_two._square_colors = [
SquareColor(x["base_color"].replace("player-1", "player-2"),\
x["inner_color"].replace("player-1", "player-2") if x["inner_color"] else None,\
x["outer_color"].replace("player-1", "player-2"))
for x in opponent_stack.square_colors
]
def _get_level_player_one(self) -> int:
return 0 if self._stack_player_one is None else self._stack_player_one.total_lines // self._lines_per_level
def _get_level_player_two(self) -> int:
return 0 if self._stack_player_two is None else self._stack_player_two.total_lines // self._lines_per_level
def _clear_current_piece_player_one(self) -> None:
self._current_piece_player_one = None
def _clear_current_piece_player_two(self) -> None:
self._current_piece_player_two = None

211
frontend/tetri5/util.py Normal file
View File

@@ -0,0 +1,211 @@
import sys
sys.path.append("__pypackages__/3.12/lib/")
import yaml # non-native package
import pygame
from typing import Tuple
"""
TODO description
"""
class ConfigurationManager:
CONFIG_FILE_LOCATION = "data/config.yaml"
_configuration = []
@classmethod
def init(cls) -> None:
with open(cls.CONFIG_FILE_LOCATION, "r") as yaml_file:
cls._configuration = yaml.safe_load(yaml_file)
@classmethod
def get(cls, key: str, sub_key: str = None):
if sub_key:
return cls._configuration[key][sub_key]
else:
return cls._configuration[key]
"""
TODO description
"""
class SoundManager:
# Channels
_theme_music_ch = None
_option_change_sfx_ch = None
_piece_rotate_sfx_ch = None
_piece_set_sfx_ch = None
_line_complete_sfx_ch = None
_four_lines_complete_sfx_ch = None
# Sounds
_theme_music = None
_option_change_sfx = None
_piece_rotate_sfx = None
_piece_set_sfx = None
_line_complete_sfx = None
_four_lines_complete_sfx = None
@classmethod
def init(cls) -> None:
pygame.mixer.init()
cls._theme_music_single_ch = pygame.mixer.Channel(0)
cls._theme_music_multi_ch = pygame.mixer.Channel(1)
cls._option_change_sfx_ch = pygame.mixer.Channel(2)
cls._piece_rotate_sfx_ch = pygame.mixer.Channel(3)
cls._piece_set_sfx_ch = pygame.mixer.Channel(4)
cls._line_complete_sfx_ch = pygame.mixer.Channel(5)
cls._four_lines_complete_sfx_ch = pygame.mixer.Channel(6)
cls._level_up_sfx_ch = pygame.mixer.Channel(7)
cls._theme_music_single = pygame.mixer.Sound(ConfigurationManager.get("sound", "theme-music-single"))
cls._theme_music_multi = pygame.mixer.Sound(ConfigurationManager.get("sound", "theme-music-multi"))
cls._option_change_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "option-change"))
cls._piece_rotate_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "piece-rotate"))
cls._piece_set_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "piece-set"))
cls._line_complete_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "line-complete"))
cls._four_lines_complete_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "four-lines-complete"))
cls._level_up_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "level-up"))
cls._game_over_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "game-over"))
cls._you_win_sfx = pygame.mixer.Sound(ConfigurationManager.get("sound", "you-win"))
@classmethod
def play_theme_music_single(cls) -> None:
cls._theme_music_single_ch.set_volume(0.7)
cls._theme_music_single_ch.play(cls._theme_music_single, -1)
@classmethod
def stop_theme_music_single(cls) -> None:
cls._theme_music_single_ch.stop()
@classmethod
def play_theme_music_multi(cls) -> None:
cls._theme_music_multi_ch.set_volume(0.7)
cls._theme_music_multi_ch.play(cls._theme_music_multi, -1)
@classmethod
def stop_theme_music_multi(cls) -> None:
cls._theme_music_multi_ch.stop()
@classmethod
def play_option_change_sfx(cls) -> None:
cls._option_change_sfx_ch.set_volume(0.8)
cls._option_change_sfx_ch.play(cls._option_change_sfx)
@classmethod
def play_piece_rotate_sfx(cls) -> None:
cls._piece_rotate_sfx_ch.set_volume(0.7)
cls._piece_rotate_sfx_ch.play(cls._piece_rotate_sfx)
@classmethod
def play_piece_set_sfx(cls) -> None:
cls._piece_set_sfx_ch.play(cls._piece_set_sfx)
@classmethod
def play_line_complete_sfx(cls) -> None:
cls._line_complete_sfx_ch.play(cls._line_complete_sfx)
@classmethod
def play_four_lines_complete_sfx(cls) -> None:
cls._four_lines_complete_sfx_ch.play(cls._four_lines_complete_sfx)
@classmethod
def play_level_up_sfx(cls) -> None:
cls._level_up_sfx_ch.play(cls._level_up_sfx)
@classmethod
def play_game_over_sfx(cls) -> None:
cls._game_over_sfx.set_volume(0.7)
cls._game_over_sfx.play()
@classmethod
def play_you_win_sfx(cls) -> None:
cls._you_win_sfx.set_volume(0.7)
cls._you_win_sfx.play()
"""
TODO description
"""
class TextGenerator:
_sheet = None
_glyph_size = (-1, -1)
_characters = { }
@classmethod
def init(cls, file: str, glyph_size: Tuple) -> None:
cls._sheet = pygame.image.load(file)
cls._glyph_size = glyph_size
# load character positions in bitmap into the characters dictionary
# letters
cls._characters["A"] = (9 * glyph_size[0], 2 * glyph_size[1])
cls._characters["B"] = (10 * glyph_size[0], 2 * glyph_size[1])
cls._characters["C"] = (11 * glyph_size[0], 2 * glyph_size[1])
cls._characters["D"] = (0 * glyph_size[0], 3 * glyph_size[1])
cls._characters["E"] = (1 * glyph_size[0], 3 * glyph_size[1])
cls._characters["F"] = (2 * glyph_size[0], 3 * glyph_size[1])
cls._characters["G"] = (3 * glyph_size[0], 3 * glyph_size[1])
cls._characters["H"] = (4 * glyph_size[0], 3 * glyph_size[1])
cls._characters["I"] = (5 * glyph_size[0], 3 * glyph_size[1])
cls._characters["J"] = (6 * glyph_size[0], 3 * glyph_size[1])
cls._characters["K"] = (7 * glyph_size[0], 3 * glyph_size[1])
cls._characters["L"] = (8 * glyph_size[0], 3 * glyph_size[1])
cls._characters["M"] = (9 * glyph_size[0], 3 * glyph_size[1])
cls._characters["N"] = (10 * glyph_size[0], 3 * glyph_size[1])
cls._characters["O"] = (11 * glyph_size[0], 3 * glyph_size[1])
cls._characters["P"] = (0 * glyph_size[0], 4 * glyph_size[1])
cls._characters["Q"] = (1 * glyph_size[0], 4 * glyph_size[1])
cls._characters["R"] = (2 * glyph_size[0], 4 * glyph_size[1])
cls._characters["S"] = (3 * glyph_size[0], 4 * glyph_size[1])
cls._characters["T"] = (4 * glyph_size[0], 4 * glyph_size[1])
cls._characters["U"] = (5 * glyph_size[0], 4 * glyph_size[1])
cls._characters["V"] = (6 * glyph_size[0], 4 * glyph_size[1])
cls._characters["W"] = (7 * glyph_size[0], 4 * glyph_size[1])
cls._characters["X"] = (8 * glyph_size[0], 4 * glyph_size[1])
cls._characters["Y"] = (9 * glyph_size[0], 4 * glyph_size[1])
cls._characters["Z"] = (10 * glyph_size[0], 4 * glyph_size[1])
# numbers
cls._characters["0"] = (4 * glyph_size[0], 1 * glyph_size[1])
cls._characters["1"] = (5 * glyph_size[0], 1 * glyph_size[1])
cls._characters["2"] = (6 * glyph_size[0], 1 * glyph_size[1])
cls._characters["3"] = (7 * glyph_size[0], 1 * glyph_size[1])
cls._characters["4"] = (8 * glyph_size[0], 1 * glyph_size[1])
cls._characters["5"] = (9 * glyph_size[0], 1 * glyph_size[1])
cls._characters["6"] = (10 * glyph_size[0], 1 * glyph_size[1])
cls._characters["7"] = (11 * glyph_size[0], 1 * glyph_size[1])
cls._characters["8"] = (0 * glyph_size[0], 2 * glyph_size[1])
cls._characters["9"] = (1 * glyph_size[0], 2 * glyph_size[1])
# symbols
cls._characters[":"] = (2 * glyph_size[0], 11 * glyph_size[1])
cls._characters["_"] = (3 * glyph_size[0], 5 * glyph_size[1])
cls._characters["."] = (2 * glyph_size[0], 1 * glyph_size[1])
cls._characters["!"] = (1 * glyph_size[0], 0 * glyph_size[1])
@classmethod
def draw(cls, text: str, position: Tuple, surface: pygame.Surface) -> None:
x_position = 0
for char_ in text:
if not char_.isspace():
surface.blit(cls._sheet, (position[0] + x_position, position[1]), pygame.Rect(cls._characters[char_.upper()], (cls._glyph_size[0], cls._glyph_size[1])))
x_position += cls._glyph_size[0]
"""
TODO description
"""
class Controller:
_keys_pressed = {}
@classmethod
def key_down(cls, key: int) -> bool:
prior_pressed_state = False if key not in cls._keys_pressed else cls._keys_pressed[key]
cls._keys_pressed[key] = pygame.key.get_pressed()[key]
return cls._keys_pressed[key] and not prior_pressed_state
@classmethod
def key_pressed(cls, key: int) -> bool:
return pygame.key.get_pressed()[key]