The TLA+ Home Page
Mathematical language for specifying and verifying concurrent and distributed systems:
What is TLA+:#
Core Purpose:#
- Formal Specification: Precisely describe system behavior mathematically
- Model Checking: Automatically verify system properties
- Bug Prevention: Find design flaws before implementation
- Documentation: Unambiguous system specification
TLA+ Components:#
---- MODULE BankTransfer ----
EXTENDS Integers
VARIABLES account1, account2
Init == account1 = 100 /\ account2 = 50
Transfer(amount) ==
/\ amount > 0
/\ account1 >= amount
/\ account1' = account1 - amount
/\ account2' = account2 + amount
Next == \E amount \in 1..account1 : Transfer(amount)
Spec == Init /\ [][Next]_<<account1, account2>>
\* Safety property: total money conserved
MoneyConserved == account1 + account2 = 150
\* Liveness property: money can be transferred
CanTransfer == <>(account1 /= 100)
====
Real-World Applications:#
Distributed System Modeling:#
---- MODULE DistributedConsensus ----
EXTENDS Integers, Sequences, FiniteSets
CONSTANTS Nodes, Values, Nil
ASSUME /\ Nodes # {}
/\ Values # {}
VARIABLES
\* Node state
state, \* [node -> "follower" | "candidate" | "leader"]
currentTerm, \* [node -> Nat]
votedFor, \* [node -> node | Nil]
log, \* [node -> sequence of entries]
\* Leader state
nextIndex, \* [leader -> [follower -> Nat]]
matchIndex \* [leader -> [follower -> Nat]]
Init ==
/\ state = [n \in Nodes |-> "follower"]
/\ currentTerm = [n \in Nodes |-> 0]
/\ votedFor = [n \in Nodes |-> Nil]
/\ log = [n \in Nodes |-> <<>>]
/\ nextIndex = [n \in Nodes |-> [m \in Nodes |-> 1]]
/\ matchIndex = [n \in Nodes |-> [m \in Nodes |-> 0]]
\* Leader election
StartElection(n) ==
/\ state[n] = "follower"
/\ state' = [state EXCEPT ![n] = "candidate"]
/\ currentTerm' = [currentTerm EXCEPT ![n] = @ + 1]
/\ votedFor' = [votedFor EXCEPT ![n] = n]
\* Send RequestVote RPCs to other nodes
/\ UNCHANGED <<log, nextIndex, matchIndex>>
\* Safety invariant: at most one leader per term
LeaderSafety ==
\A t \in Nat :
Cardinality({n \in Nodes : state[n] = "leader" /\ currentTerm[n] = t}) <= 1
====
Model Checking with TLC:#
Property Verification:#
\* Temporal Logic properties
PROPERTY MoneyConserved \* Always true
PROPERTY <>CanTransfer \* Eventually true
PROPERTY []RespondsToRequests \* Always responds
\* Model checking configuration
SPECIFICATION Spec
INVARIANT MoneyConserved
PROPERTY <>CanTransfer
\* State space constraints
CONSTANTS
MaxTransfer = 10
MaxAccounts = 3
Example Bug Discovery:#
\* Before fix: race condition in concurrent transfer
Transfer(amount) ==
/\ amount > 0
/\ account1 >= amount \* Race: check and update not atomic
/\ account1' = account1 - amount
/\ account2' = account2 + amount
\* TLC finds: account1 can go negative with concurrent transfers
\* After fix: atomic check-and-update
Transfer(amount) ==
/\ amount > 0
/\ account1 >= amount
/\ account1' = account1 - amount
/\ account2' = account2 + amount
/\ account1' >= 0 \* Explicit constraint
Benefits for Software Engineering:#
Design Validation:#
- Early Bug Detection: Find issues before coding
- Corner Case Discovery: Explore all possible execution paths
- Architecture Verification: Ensure system meets requirements
- Team Communication: Precise specification reduces ambiguity
Industry Adoption:#
- Amazon: Uses TLA+ for AWS services (S3, DynamoDB)
- Microsoft: Azure Cosmos DB specifications
- MongoDB: Replication protocol verification
- Uber: Distributed system design validation
Statecharts for Redux Applications#
How to model the behavior of Redux apps using statecharts
Welcome to the world of Statecharts
Powerful visual formalism for modeling complex application state:
Problems with Traditional State Management:#
Redux Complexity Issues:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// Traditional Redux - hard to visualize state transitions
const reducer = (state = initialState, action) => {
switch (action.type) {
case 'FETCH_START':
return { ...state, loading: true, error: null };
case 'FETCH_SUCCESS':
return { ...state, loading: false, data: action.data };
case 'FETCH_ERROR':
return { ...state, loading: false, error: action.error };
case 'RETRY':
return { ...state, loading: true, error: null, retryCount: state.retryCount + 1 };
// ... many more cases with complex interdependencies
}
};
// Hard to answer: "What states can transition to what other states?"
// Implicit state combinations: { loading: true, error: "Network error" }
|
Statecharts Solution:#
Explicit State Modeling:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import { Machine } from 'xstate';
const fetchMachine = Machine({
id: 'fetch',
initial: 'idle',
states: {
idle: {
on: {
FETCH: 'loading'
}
},
loading: {
on: {
SUCCESS: 'success',
ERROR: 'error'
}
},
success: {
on: {
FETCH: 'loading',
RESET: 'idle'
}
},
error: {
on: {
RETRY: 'loading',
RESET: 'idle'
}
}
}
});
// Clear state transitions, impossible states eliminated
// Visual representation possible
|
Hierarchical States:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
const userMachine = Machine({
id: 'user',
initial: 'authenticated',
states: {
authenticated: {
initial: 'idle',
states: {
idle: {
on: {
EDIT_PROFILE: 'editingProfile',
CHANGE_PASSWORD: 'changingPassword'
}
},
editingProfile: {
on: {
SAVE: 'saving',
CANCEL: 'idle'
}
},
changingPassword: {
on: {
SAVE: 'saving',
CANCEL: 'idle'
}
},
saving: {
on: {
SUCCESS: 'idle',
ERROR: 'error'
}
},
error: {
on: {
RETRY: 'saving',
DISMISS: 'idle'
}
}
},
on: {
LOGOUT: 'unauthenticated'
}
},
unauthenticated: {
on: {
LOGIN: 'authenticated'
}
}
}
});
|
Advanced Statechart Features:#
Parallel States:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
const mediPlayerMachine = Machine({
id: 'mediaPlayer',
type: 'parallel',
states: {
// Independent state regions
playback: {
initial: 'stopped',
states: {
stopped: { on: { PLAY: 'playing' } },
playing: { on: { PAUSE: 'paused', STOP: 'stopped' } },
paused: { on: { PLAY: 'playing', STOP: 'stopped' } }
}
},
volume: {
initial: 'normal',
states: {
muted: { on: { UNMUTE: 'normal' } },
normal: { on: { MUTE: 'muted' } }
}
},
fullscreen: {
initial: 'windowed',
states: {
windowed: { on: { FULLSCREEN: 'fullscreen' } },
fullscreen: { on: { EXIT_FULLSCREEN: 'windowed' } }
}
}
}
});
// Can be: { playback: 'playing', volume: 'muted', fullscreen: 'windowed' }
|
Guards and Actions:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
const authMachine = Machine({
id: 'auth',
initial: 'idle',
context: {
user: null,
retries: 0,
maxRetries: 3
},
states: {
idle: {
on: {
LOGIN: {
target: 'authenticating',
actions: ['clearError']
}
}
},
authenticating: {
invoke: {
src: 'authenticate',
onDone: {
target: 'authenticated',
actions: ['setUser']
},
onError: [
{
target: 'error',
cond: 'maxRetriesReached',
actions: ['setError']
},
{
target: 'idle',
actions: ['incrementRetries', 'setError']
}
]
}
}
}
}, {
guards: {
maxRetriesReached: (context) => context.retries >= context.maxRetries
},
actions: {
setUser: (context, event) => ({ ...context, user: event.data }),
setError: (context, event) => ({ ...context, error: event.data }),
incrementRetries: (context) => ({ ...context, retries: context.retries + 1 }),
clearError: (context) => ({ ...context, error: null })
}
});
|
HashiCorp Nomad - Workload Orchestrator#
Nomad by HashiCorp
Simple, flexible workload orchestrator for deploying applications:
Core Concepts:#
Job Specification:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
job "web-app" {
datacenters = ["dc1"]
type = "service"
group "web" {
count = 3
# Placement constraints
constraint {
attribute = "${node.class}"
value = "web-servers"
}
# Resource requirements
network {
port "http" {
static = 8080
}
}
task "frontend" {
driver = "docker"
config {
image = "nginx:latest"
ports = ["http"]
mount {
type = "bind"
source = "local/nginx.conf"
target = "/etc/nginx/nginx.conf"
}
}
# Resource allocation
resources {
cpu = 500 # MHz
memory = 256 # MB
}
# Health checking
service {
name = "web-frontend"
port = "http"
check {
type = "http"
path = "/health"
interval = "10s"
timeout = "3s"
}
}
}
}
}
|
Multi-Driver Support:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# Docker containers
task "api" {
driver = "docker"
config {
image = "myapp:latest"
ports = ["http"]
}
}
# Raw executables
task "worker" {
driver = "exec"
config {
command = "/usr/local/bin/worker"
args = ["--config", "local/worker.conf"]
}
}
# Java applications
task "service" {
driver = "java"
config {
jar_path = "local/service.jar"
args = ["-Xmx1024m"]
}
}
# QEMU virtual machines
task "legacy" {
driver = "qemu"
config {
image_path = "local/legacy-system.qcow2"
accelerator = "kvm"
}
}
|
Advanced Features:#
Service Discovery:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
service {
name = "database"
port = "db"
# Consul integration
tags = ["primary", "v1.2.3"]
# Health checks
check {
type = "tcp"
interval = "10s"
timeout = "3s"
}
# Load balancer registration
connect {
sidecar_service {}
}
}
# Template rendering with service discovery
template {
data = <<EOF
{{range services}}
{{.Name}}: {{range .Tags}}{{.}}{{end}}
{{range service .Name}}
{{.Address}}:{{.Port}}
{{end}}
{{end}}
EOF
destination = "local/services.conf"
change_mode = "restart"
}
|
Volume Management:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# Host volumes
group "database" {
volume "data" {
type = "host"
source = "database-vol"
}
task "postgres" {
volume_mount {
volume = "data"
destination = "/var/lib/postgresql/data"
}
}
}
# CSI volumes (external storage)
volume "shared-data" {
type = "csi"
plugin_id = "aws-ebs"
source = "vol-12345"
access_mode = "multi-node-multi-writer"
attachment_mode = "file-system"
}
|
Operational Features:#
Rolling Updates:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
group "app" {
count = 5
update {
max_parallel = 2
min_healthy_time = "30s"
healthy_deadline = "5m"
progress_deadline = "10m"
auto_revert = true
auto_promote = false
}
}
# Blue/green deployments
job "api" {
update {
canary = 2
auto_promote = false
auto_revert = true
}
}
|
Resource Management:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# Resource scheduling
resources {
cpu = 1000 # MHz
memory = 512 # MB
disk = 1024 # MB
# Network bandwidth
network {
mbits = 100
}
}
# Device constraints (GPUs, etc.)
device "nvidia/gpu" {
count = 1
constraint {
attribute = "${device.attr.memory}"
operator = ">="
value = "4096"
}
}
|
Python Socket Programming#
Socket Programming HOWTO โ Python documentation
Socket Programming in Python (Guide) โ Real Python
Comprehensive guide to network programming with Python sockets:
Socket Fundamentals:#
Basic TCP Client/Server:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# TCP Server
import socket
def tcp_server():
# Create socket object
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind to address and port
host = 'localhost'
port = 8080
server_socket.bind((host, port))
# Listen for connections
server_socket.listen(5)
print(f"Server listening on {host}:{port}")
while True:
# Accept connection
client_socket, address = server_socket.accept()
print(f"Connection from {address}")
try:
# Receive data
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
# Send response
response = f"Echo: {data}"
client_socket.send(response.encode('utf-8'))
finally:
client_socket.close()
# TCP Client
def tcp_client():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Connect to server
client_socket.connect(('localhost', 8080))
# Send data
message = "Hello, server!"
client_socket.send(message.encode('utf-8'))
# Receive response
response = client_socket.recv(1024).decode('utf-8')
print(f"Server response: {response}")
finally:
client_socket.close()
|
UDP Communication:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# UDP Server
def udp_server():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('localhost', 8081))
print("UDP Server listening on localhost:8081")
while True:
# Receive data (no connection needed)
data, address = server_socket.recvfrom(1024)
print(f"Received from {address}: {data.decode('utf-8')}")
# Send response back to sender
response = f"UDP Echo: {data.decode('utf-8')}"
server_socket.sendto(response.encode('utf-8'), address)
# UDP Client
def udp_client():
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Send data (no connection needed)
message = "Hello, UDP server!"
client_socket.sendto(message.encode('utf-8'), ('localhost', 8081))
# Receive response
response, server_address = client_socket.recvfrom(1024)
print(f"Server response: {response.decode('utf-8')}")
finally:
client_socket.close()
|
Advanced Socket Programming:#
Multithreaded Server:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import threading
import socket
class ThreadedTCPServer:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def handle_client(self, client_socket, address):
"""Handle individual client connection."""
print(f"New client connected: {address}")
try:
while True:
data = client_socket.recv(1024)
if not data:
break
# Echo data back to client
response = f"Echo: {data.decode('utf-8')}"
client_socket.send(response.encode('utf-8'))
except ConnectionResetError:
print(f"Client {address} disconnected")
finally:
client_socket.close()
print(f"Connection with {address} closed")
def start(self):
"""Start the server and accept connections."""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"Server listening on {self.host}:{self.port}")
try:
while True:
client_socket, address = self.server_socket.accept()
# Create new thread for each client
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nShutting down server...")
finally:
self.server_socket.close()
# Usage
if __name__ == "__main__":
server = ThreadedTCPServer()
server.start()
|
Asynchronous Sockets with asyncio:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import asyncio
async def echo_server():
"""Async TCP echo server."""
async def handle_client(reader, writer):
address = writer.get_extra_info('peername')
print(f"Client connected: {address}")
try:
while True:
# Read data
data = await reader.read(1024)
if not data:
break
message = data.decode('utf-8')
print(f"Received from {address}: {message}")
# Write response
response = f"Echo: {message}"
writer.write(response.encode('utf-8'))
await writer.drain()
except asyncio.CancelledError:
pass
finally:
writer.close()
await writer.wait_closed()
print(f"Client {address} disconnected")
# Start server
server = await asyncio.start_server(handle_client, 'localhost', 8080)
address = server.sockets[0].getsockname()
print(f"Async server running on {address[0]}:{address[1]}")
async with server:
await server.serve_forever()
# Client
async def async_client():
reader, writer = await asyncio.open_connection('localhost', 8080)
try:
# Send messages
messages = ["Hello", "How are you?", "Goodbye"]
for message in messages:
writer.write(message.encode('utf-8'))
await writer.drain()
# Read response
data = await reader.read(1024)
response = data.decode('utf-8')
print(f"Server: {response}")
await asyncio.sleep(1)
finally:
writer.close()
await writer.wait_closed()
# Run async server
if __name__ == "__main__":
asyncio.run(echo_server())
|
Socket Error Handling:#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import socket
import errno
import time
def robust_client(host, port, max_retries=3):
"""Client with comprehensive error handling."""
for attempt in range(max_retries):
try:
# Create socket with timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10.0) # 10 second timeout
# Connect
sock.connect((host, port))
print(f"Connected to {host}:{port}")
# Send data
message = "Hello, server!"
sock.send(message.encode('utf-8'))
# Receive response
response = sock.recv(1024).decode('utf-8')
print(f"Response: {response}")
return response
except socket.timeout:
print(f"Attempt {attempt + 1}: Connection timeout")
except socket.gaierror as e:
print(f"Address resolution error: {e}")
break # Don't retry on DNS errors
except ConnectionRefusedError:
print(f"Attempt {attempt + 1}: Connection refused")
except OSError as e:
if e.errno == errno.ENETUNREACH:
print(f"Network unreachable: {e}")
break
else:
print(f"OS error: {e}")
finally:
if 'sock' in locals():
sock.close()
# Wait before retry
if attempt < max_retries - 1:
time.sleep(2)
print(f"Failed to connect after {max_retries} attempts")
return None
|
These tools and concepts represent different aspects of system design and implementation - formal verification for correctness, visual state modeling for complex applications, flexible workload orchestration, and robust network programming fundamentals.