基本
- 判断是ipv4、ipv6
class IPvStatus():
def __init__(self, ip):
self.ip = ip
def ipv4(self):
try:
socket.inet_pton(socket.AF_INET, self.ip)
except AttributeError:
try:
socket.inet_aton(self.ip)
except socket.error:
return False
return self.ip.count('.') == 3
except socket.error:
return False
return True
def ipv6(self):
try:
socket.inet_pton(socket.AF_INET6, self.ip)
except socket.error:
return False
return True
def is_ip(self): # 是否是ip
if self.ipv4() or self.ipv6():
return True
else:
return False
客户端
import json
import struct
host = "127.0.0.1"
ip_status = IPvStatus(host)
if ip_status.ipv6():
# ipv6
mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
port = 4256
else:
# ipv4
mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = 5506
try:
mySocket.connect((host, port))
except:
pass
dic_data = {}
# 数据总长度
size = len(json.dumps(dic_data).encode("utf-8"))
# 模拟报头
f = struct.pack("l", size)
# 先发送报头
mySocket.send(f)
# 再发数据
mySocket.send(json.dumps(dic_data).encode("utf-8"))
msg = mySocket.recv(10240)
if msg:
msg = json.loads(str(msg, encoding="utf-8"))
mySocket.close()
else:
mySocket.close()
服务端
- ipv4
import socket
import json
import struct
# 创建套接字
mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置IP和端口
host = '0.0.0.0'
port = 5506
block_size = 512
recv_data_size = 0
# bind绑定该端口
mySocket.bind((host, port))
# 监听
mySocket.listen(10)
while True:
client, address = mySocket.accept()
while True:
# 接受定长报头
data_header = client.recv(struct.calcsize("l"))
total_size = struct.unpack("l", data_header)
# 真正数据总长度
size = total_size[0]
data_bytes = b''
while recv_data_size < size:
data = client.recv(block_size)
data_bytes += data
recv_data_size += len(data)
if data_bytes:
data_dic = str(data_bytes, encoding='utf-8')
# 真实数据
data_dic = json.loads(data_dic)
break
- ipv6
import socket
import json
import struct
# 创建套接字
mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# 设置IP和端口
host = ''
port = 4256
block_size = 512
recv_data_size = 0
# bind绑定该端口
mySocket.bind((host, port))
# 监听
mySocket.listen(10)
while True:
client, address = mySocket.accept()
while True:
# 接受定长报头
data_header = client.recv(struct.calcsize("l"))
total_size = struct.unpack("l", data_header)
# 真正数据总长度
size = total_size[0]
data_bytes = b''
while recv_data_size < size:
data = client.recv(block_size)
data_bytes += data
recv_data_size += len(data)
if data_bytes:
data_dic = str(data_bytes, encoding='utf-8')
# 真实数据
data_dic = json.loads(data_dic)
break
注意:
ipv6服务端的host绑定空字符串
- 优化完整代码
import socket
import json
import struct
class SocketMain():
def __init__(self, ipv4_port=5506, ipv6_port=4256, block_size=512, host=None, port=None):
self.ipv4_host = "127.0.0.1"
self.ipv6_host = ""
self.ipv4_port = ipv4_port
self.ipv6_port = ipv6_port
self.block_size = block_size
self.recv_data_size = 0
self.host = host
self.port = port
# 服务端
def socket_server(self, is_ipv4=True):
if is_ipv4:
host = self.ipv4_host
port = self.ipv4_port
mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
host = self.ipv6_host
port = self.ipv6_port
mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
mySocket.bind((host, port))
# 监听
mySocket.listen(10)
while True:
client, address = mySocket.accept()
while True:
# 接受定长报头
data_header = client.recv(struct.calcsize("l"))
total_size = struct.unpack("l", data_header)
# 真正数据总长度
size = total_size[0]
data_bytes = b''
while self.recv_data_size < size:
data = client.recv(self.block_size)
data_bytes += data
self.recv_data_size += len(data)
if data_bytes:
data_dic = str(data_bytes, encoding='utf-8')
# 真实数据
data_dic = json.loads(data_dic)
if data_dic:
return data_dic
break
# 客户端
def Socket_client(self):
if not (self.host and self.host):
return False
ip_status = IPvStatus(self.host)
if ip_status.ipv6():
# ipv6
mySocket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
port = 4256
else:
# ipv4
mySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = 5506
try:
mySocket.connect((self.host, port))
except:
pass
dic_data = {}
# 数据总长度
size = len(json.dumps(dic_data).encode("utf-8"))
# 模拟报头
f = struct.pack("l", size)
# 先发送报头
mySocket.send(f)
# 再发数据
mySocket.send(json.dumps(dic_data).encode("utf-8"))
msg = mySocket.recv(self.block_size)
if msg:
msg = json.loads(str(msg, encoding="utf-8"))
mySocket.close()
else:
mySocket.close()
正文完
可以使用微信扫码关注公众号(ID:xzluomor)